mirror of
https://github.com/valitydev/thrift.git
synced 2024-11-07 10:48:51 +00:00
THRIFT-638. php: BufferedTransport + C extensions block until recv timeout is reached on last fread call
This patch refactors TSocket to make use of stream_select() for timeout detection. Patch: Nicholas Telford git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1076917 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
a971fb0948
commit
c46f32ce18
@ -50,25 +50,40 @@ class TSocket extends TTransport {
|
||||
protected $port_ = '9090';
|
||||
|
||||
/**
|
||||
* Send timeout in milliseconds
|
||||
* Send timeout in seconds.
|
||||
*
|
||||
* Combined with sendTimeoutUsec this is used for send timeouts.
|
||||
*
|
||||
* @var int
|
||||
*/
|
||||
private $sendTimeout_ = 100;
|
||||
private $sendTimeoutSec_ = 0;
|
||||
|
||||
/**
|
||||
* Recv timeout in milliseconds
|
||||
* Send timeout in microseconds.
|
||||
*
|
||||
* Combined with sendTimeoutSec this is used for send timeouts.
|
||||
*
|
||||
* @var int
|
||||
*/
|
||||
private $recvTimeout_ = 750;
|
||||
private $sendTimeoutUsec_ = 100000;
|
||||
|
||||
/**
|
||||
* Is send timeout set?
|
||||
* Recv timeout in seconds
|
||||
*
|
||||
* @var bool
|
||||
* Combined with recvTimeoutUsec this is used for recv timeouts.
|
||||
*
|
||||
* @var int
|
||||
*/
|
||||
private $sendTimeoutSet_ = FALSE;
|
||||
private $recvTimeoutSec_ = 0;
|
||||
|
||||
/**
|
||||
* Recv timeout in microseconds
|
||||
*
|
||||
* Combined with recvTimeoutSec this is used for recv timeouts.
|
||||
*
|
||||
* @var int
|
||||
*/
|
||||
private $recvTimeoutUsec_ = 750000;
|
||||
|
||||
/**
|
||||
* Persistent socket or plain?
|
||||
@ -123,7 +138,9 @@ class TSocket extends TTransport {
|
||||
* @param int $timeout Timeout in milliseconds.
|
||||
*/
|
||||
public function setSendTimeout($timeout) {
|
||||
$this->sendTimeout_ = $timeout;
|
||||
$this->sendTimeoutSec_ = floor($timeout / 1000);
|
||||
$this->sendTimeoutUsec_ =
|
||||
($timeout - ($this->sendTimeoutSec_ * 1000)) * 1000;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -132,7 +149,9 @@ class TSocket extends TTransport {
|
||||
* @param int $timeout Timeout in milliseconds.
|
||||
*/
|
||||
public function setRecvTimeout($timeout) {
|
||||
$this->recvTimeout_ = $timeout;
|
||||
$this->recvTimeoutSec_ = floor($timeout / 1000);
|
||||
$this->recvTimeoutUsec_ =
|
||||
($timeout - ($this->recvTimeoutSec_ * 1000)) * 1000;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -192,13 +211,13 @@ class TSocket extends TTransport {
|
||||
$this->port_,
|
||||
$errno,
|
||||
$errstr,
|
||||
$this->sendTimeout_/1000.0);
|
||||
$this->sendTimeoutSec_ + ($this->sendTimeoutUsec_ / 1000000));
|
||||
} else {
|
||||
$this->handle_ = @fsockopen($this->host_,
|
||||
$this->port_,
|
||||
$errno,
|
||||
$errstr,
|
||||
$this->sendTimeout_/1000.0);
|
||||
$this->sendTimeoutSec_ + ($this->sendTimeoutUsec_ / 1000000));
|
||||
}
|
||||
|
||||
// Connect failed?
|
||||
@ -209,9 +228,6 @@ class TSocket extends TTransport {
|
||||
}
|
||||
throw new TException($error);
|
||||
}
|
||||
|
||||
stream_set_timeout($this->handle_, 0, $this->sendTimeout_*1000);
|
||||
$this->sendTimeoutSet_ = TRUE;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -225,66 +241,30 @@ class TSocket extends TTransport {
|
||||
}
|
||||
|
||||
/**
|
||||
* Uses stream get contents to do the reading
|
||||
* Read from the socket at most $len bytes.
|
||||
*
|
||||
* @param int $len How many bytes
|
||||
* @return string Binary data
|
||||
*/
|
||||
public function readAll($len) {
|
||||
if ($this->sendTimeoutSet_) {
|
||||
stream_set_timeout($this->handle_, 0, $this->recvTimeout_*1000);
|
||||
$this->sendTimeoutSet_ = FALSE;
|
||||
}
|
||||
// This call does not obey stream_set_timeout values!
|
||||
// $buf = @stream_get_contents($this->handle_, $len);
|
||||
|
||||
$pre = null;
|
||||
while (TRUE) {
|
||||
$buf = @fread($this->handle_, $len);
|
||||
if ($buf === FALSE) {
|
||||
$md = stream_get_meta_data($this->handle_);
|
||||
if (true === $md['timed_out'] && false === $md['blocked']) {
|
||||
throw new TTransportException('TSocket: timed out reading '.$len.' bytes from '.
|
||||
$this->host_.':'.$this->port_);
|
||||
} else {
|
||||
throw new TTransportException('TSocket: Could not read '.$len.' bytes from '.
|
||||
$this->host_.':'.$this->port_);
|
||||
}
|
||||
}
|
||||
else if (($sz = strlen($buf)) < $len) {
|
||||
if((strlen($buf) == 0) && feof($this->handle_)){
|
||||
throw new TTransportException('TSocket read 0 bytes');
|
||||
};
|
||||
|
||||
$md = stream_get_meta_data($this->handle_);
|
||||
if (true === $md['timed_out'] && false === $md['blocked']) {
|
||||
throw new TTransportException('TSocket: timed out reading '.$len.' bytes from '.
|
||||
$this->host_.':'.$this->port_);
|
||||
} else {
|
||||
$pre .= $buf;
|
||||
$len -= $sz;
|
||||
}
|
||||
} else {
|
||||
return $pre.$buf;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Read from the socket
|
||||
* This method will not wait for all the requested data, it will return as
|
||||
* soon as any data is received.
|
||||
*
|
||||
* @param int $len How many bytes
|
||||
* @param int $len Maximum number of bytes to read.
|
||||
* @return string Binary data
|
||||
*/
|
||||
public function read($len) {
|
||||
if ($this->sendTimeoutSet_) {
|
||||
stream_set_timeout($this->handle_, 0, $this->recvTimeout_*1000);
|
||||
$this->sendTimeoutSet_ = FALSE;
|
||||
}
|
||||
$data = @fread($this->handle_, $len);
|
||||
if ($data === FALSE) {
|
||||
$md = stream_get_meta_data($this->handle_);
|
||||
if (true === $md['timed_out'] && false === $md['blocked']) {
|
||||
$null = null;
|
||||
$read = array($this->handle_);
|
||||
$readable = @stream_select($read, $null, $null, $this->recvTimeoutSec_, $this->recvTimeoutUsec_);
|
||||
|
||||
if ($readable > 0) {
|
||||
$data = @stream_socket_recvfrom($this->handle_, $len);
|
||||
if ($data === false) {
|
||||
throw new TTransportException('TSocket: Could not read '.$len.' bytes from '.
|
||||
$this->host_.':'.$this->port_);
|
||||
} elseif($data == '' && feof($this->handle_)) {
|
||||
throw new TTransportException('TSocket read 0 bytes');
|
||||
}
|
||||
|
||||
return $data;
|
||||
} else if ($readable === 0) {
|
||||
throw new TTransportException('TSocket: timed out reading '.$len.' bytes from '.
|
||||
$this->host_.':'.$this->port_);
|
||||
} else {
|
||||
@ -292,12 +272,6 @@ class TSocket extends TTransport {
|
||||
$this->host_.':'.$this->port_);
|
||||
}
|
||||
}
|
||||
elseif((strlen($data) == 0) && feof($this->handle_))
|
||||
{
|
||||
throw new TTransportException('TSocket read 0 bytes');
|
||||
};
|
||||
return $data;
|
||||
}
|
||||
|
||||
/**
|
||||
* Write to the socket.
|
||||
@ -305,15 +279,23 @@ class TSocket extends TTransport {
|
||||
* @param string $buf The data to write
|
||||
*/
|
||||
public function write($buf) {
|
||||
if (!$this->sendTimeoutSet_) {
|
||||
stream_set_timeout($this->handle_, 0, $this->sendTimeout_*1000);
|
||||
$this->sendTimeoutSet_ = TRUE;
|
||||
}
|
||||
$null = null;
|
||||
$write = array($this->handle_);
|
||||
|
||||
// keep writing until all the data has been written
|
||||
while (strlen($buf) > 0) {
|
||||
$got = @fwrite($this->handle_, $buf);
|
||||
if ($got === 0 || $got === FALSE) {
|
||||
$md = stream_get_meta_data($this->handle_);
|
||||
if ($md['timed_out']) {
|
||||
// wait for stream to become available for writing
|
||||
$writable = @stream_select($null, $write, $null, $this->sendTimeoutSec_, $this->sendTimeoutUsec_);
|
||||
if ($writable > 0) {
|
||||
// write buffer to stream
|
||||
$written = @stream_socket_sendto($this->handle_, $buf);
|
||||
if ($written === -1 || $written === false) {
|
||||
throw new TTransportException('TSocket: Could not write '.strlen($buf).' bytes '.
|
||||
$this->host_.':'.$this->port_);
|
||||
}
|
||||
// determine how much of the buffer is left to write
|
||||
$buf = substr($buf, $written);
|
||||
} else if ($writable === 0) {
|
||||
throw new TTransportException('TSocket: timed out writing '.strlen($buf).' bytes from '.
|
||||
$this->host_.':'.$this->port_);
|
||||
} else {
|
||||
@ -321,18 +303,18 @@ class TSocket extends TTransport {
|
||||
$this->host_.':'.$this->port_);
|
||||
}
|
||||
}
|
||||
$buf = substr($buf, $got);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Flush output to the socket.
|
||||
*
|
||||
* Since read(), readAll() and write() operate on the sockets directly,
|
||||
* this is a no-op
|
||||
*
|
||||
* If you wish to have flushable buffering behaviour, wrap this TSocket
|
||||
* in a TBufferedTransport.
|
||||
*/
|
||||
public function flush() {
|
||||
$ret = fflush($this->handle_);
|
||||
if ($ret === FALSE) {
|
||||
throw new TException('TSocket: Could not flush: '.
|
||||
$this->host_.':'.$this->port_);
|
||||
// no-op
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user