THRIFT-4451 Use a shared TcpStream between both Thrift clients in cross-test

Client: rs
This commit is contained in:
Allen George 2021-02-28 07:43:51 -05:00
parent 93ae7af5ba
commit 5cff2793c0
No known key found for this signature in database
GPG Key ID: 3DE10B8D7C53BBC5
2 changed files with 18 additions and 41 deletions

View File

@ -271,10 +271,6 @@
"cpp-py_multi_http-domain", "cpp-py_multi_http-domain",
"cpp-py_multi_http-ip", "cpp-py_multi_http-ip",
"cpp-py_multi_http-ip-ssl", "cpp-py_multi_http-ip-ssl",
"cpp-rs_multic_buffered-ip",
"cpp-rs_multic_framed-ip",
"cpp-rs_multi_buffered-ip",
"cpp-rs_multi_framed-ip",
"c_glib-netstd_binary_buffered-ip", "c_glib-netstd_binary_buffered-ip",
"c_glib-netstd_binary_framed-ip", "c_glib-netstd_binary_framed-ip",
"c_glib-netstd_compact_buffered-ip", "c_glib-netstd_compact_buffered-ip",
@ -283,10 +279,6 @@
"c_glib-netstd_multi-binary_framed-ip", "c_glib-netstd_multi-binary_framed-ip",
"c_glib-netstd_multic-compact_buffered-ip", "c_glib-netstd_multic-compact_buffered-ip",
"c_glib-netstd_multic-compact_framed-ip", "c_glib-netstd_multic-compact_framed-ip",
"c_glib-rs_multic_buffered-ip",
"c_glib-rs_multic_framed-ip",
"c_glib-rs_multi_buffered-ip",
"c_glib-rs_multi_framed-ip",
"d-cl_binary_buffered-ip", "d-cl_binary_buffered-ip",
"d-cl_binary_framed-ip", "d-cl_binary_framed-ip",
"d-cpp_binary_buffered-ip", "d-cpp_binary_buffered-ip",
@ -886,8 +878,6 @@
"perl-netstd_multi-binary_buffered-ip-ssl", "perl-netstd_multi-binary_buffered-ip-ssl",
"perl-netstd_multi-binary_framed-ip", "perl-netstd_multi-binary_framed-ip",
"perl-netstd_multi-binary_framed-ip-ssl", "perl-netstd_multi-binary_framed-ip-ssl",
"perl-rs_multi_buffered-ip",
"perl-rs_multi_framed-ip",
"py-cpp_accel-binary_http-domain", "py-cpp_accel-binary_http-domain",
"py-cpp_accel-binary_http-ip", "py-cpp_accel-binary_http-ip",
"py-cpp_accel-binary_http-ip-ssl", "py-cpp_accel-binary_http-ip-ssl",
@ -1186,14 +1176,6 @@
"py3-php_binary-accel_framed-ip", "py3-php_binary-accel_framed-ip",
"py3-php_json_buffered-ip", "py3-php_json_buffered-ip",
"py3-php_json_framed-ip", "py3-php_json_framed-ip",
"py3-rs_multia-multi_buffered-ip",
"py3-rs_multia-multi_framed-ip",
"py3-rs_multiac-multic_buffered-ip",
"py3-rs_multiac-multic_framed-ip",
"py3-rs_multic_buffered-ip",
"py3-rs_multic_framed-ip",
"py3-rs_multi_buffered-ip",
"py3-rs_multi_framed-ip",
"rb-cpp_json_buffered-domain", "rb-cpp_json_buffered-domain",
"rb-cpp_json_buffered-ip", "rb-cpp_json_buffered-ip",
"rb-cpp_json_buffered-ip-ssl", "rb-cpp_json_buffered-ip-ssl",

View File

@ -21,15 +21,16 @@ use clap::{clap_app, value_t};
use std::collections::{BTreeMap, BTreeSet}; use std::collections::{BTreeMap, BTreeSet};
use std::fmt::Debug; use std::fmt::Debug;
use std::net::TcpStream;
use thrift; use thrift;
use thrift::OrderedFloat; use thrift::OrderedFloat;
use thrift::protocol::{TBinaryInputProtocol, TBinaryOutputProtocol, TCompactInputProtocol, use thrift::protocol::{TBinaryInputProtocol, TBinaryOutputProtocol, TCompactInputProtocol,
TCompactOutputProtocol, TInputProtocol, TMultiplexedOutputProtocol, TCompactOutputProtocol, TInputProtocol, TMultiplexedOutputProtocol,
TOutputProtocol}; TOutputProtocol};
use thrift::transport::{ReadHalf, TBufferedReadTransport, TBufferedWriteTransport, use thrift::transport::{TBufferedReadTransport, TBufferedWriteTransport,
TFramedReadTransport, TFramedWriteTransport, TIoChannel, TReadTransport, TFramedReadTransport, TFramedWriteTransport, TIoChannel, TReadTransport,
TTcpChannel, TWriteTransport, WriteHalf}; TTcpChannel, TWriteTransport};
use thrift_test::*; use thrift_test::*;
fn main() { fn main() {
@ -71,19 +72,24 @@ fn run() -> thrift::Result<()> {
let transport = matches.value_of("transport").unwrap_or("buffered"); let transport = matches.value_of("transport").unwrap_or("buffered");
let protocol = matches.value_of("protocol").unwrap_or("binary"); let protocol = matches.value_of("protocol").unwrap_or("binary");
// create a TCPStream that will be shared by all Thrift clients
let mut thrift_test_client = { // service calls from multiple Thrift clients will be interleaved over the same connection
let (i_prot, o_prot) = build_protocols(host, port, transport, protocol, "ThriftTest")?; // this isn't a problem for us because we're single-threaded and all calls block to completion
ThriftTestSyncClient::new(i_prot, o_prot) let shared_stream = TcpStream::connect(format!("{}:{}", host, port))?;
};
let mut second_service_client = if protocol.starts_with("multi") { let mut second_service_client = if protocol.starts_with("multi") {
let (i_prot, o_prot) = build_protocols(host, port, transport, protocol, "SecondService")?; let shared_stream_clone = shared_stream.try_clone()?;
let (i_prot, o_prot) = build(shared_stream_clone, transport, protocol, "SecondService")?;
Some(SecondServiceSyncClient::new(i_prot, o_prot)) Some(SecondServiceSyncClient::new(i_prot, o_prot))
} else { } else {
None None
}; };
let mut thrift_test_client = {
let (i_prot, o_prot) = build(shared_stream, transport, protocol, "ThriftTest")?;
ThriftTestSyncClient::new(i_prot, o_prot)
};
info!( info!(
"connecting to {}:{} with {}+{} stack", "connecting to {}:{} with {}+{} stack",
host, host,
@ -99,14 +105,14 @@ fn run() -> thrift::Result<()> {
Ok(()) Ok(())
} }
fn build_protocols( fn build(
host: &str, stream: TcpStream,
port: u16,
transport: &str, transport: &str,
protocol: &str, protocol: &str,
service_name: &str, service_name: &str,
) -> thrift::Result<(Box<dyn TInputProtocol>, Box<dyn TOutputProtocol>)> { ) -> thrift::Result<(Box<dyn TInputProtocol>, Box<dyn TOutputProtocol>)> {
let (i_chan, o_chan) = tcp_channel(host, port)?; let c = TTcpChannel::with_stream(stream);
let (i_chan, o_chan) = c.split()?;
let (i_tran, o_tran): (Box<dyn TReadTransport>, Box<dyn TWriteTransport>) = match transport { let (i_tran, o_tran): (Box<dyn TReadTransport>, Box<dyn TWriteTransport>) = match transport {
"buffered" => { "buffered" => {
@ -148,17 +154,6 @@ fn build_protocols(
Ok((i_prot, o_prot)) Ok((i_prot, o_prot))
} }
// FIXME: expose "open" through the client interface so I don't have to early
// open
fn tcp_channel(
host: &str,
port: u16,
) -> thrift::Result<(ReadHalf<TTcpChannel>, WriteHalf<TTcpChannel>)> {
let mut c = TTcpChannel::new();
c.open(&format!("{}:{}", host, port))?;
c.split()
}
type BuildThriftTestClient = ThriftTestSyncClient<Box<dyn TInputProtocol>, Box<dyn TOutputProtocol>>; type BuildThriftTestClient = ThriftTestSyncClient<Box<dyn TInputProtocol>, Box<dyn TOutputProtocol>>;
type BuiltSecondServiceClient = SecondServiceSyncClient<Box<dyn TInputProtocol>, Box<dyn TOutputProtocol>>; type BuiltSecondServiceClient = SecondServiceSyncClient<Box<dyn TInputProtocol>, Box<dyn TOutputProtocol>>;