mirror of
https://github.com/valitydev/thrift.git
synced 2024-11-07 02:45:22 +00:00
THRIFT-377. java: TFileTransport port in Java
This patch adds TFileTransport to the java library. This transport is not a general-purpose file transport; instead, it is more of a way to execute one-way RPC via an offline file process. Patch: Joydeep Sen Sarma git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@1028136 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
8c9c8bf8f0
commit
b6722bf8c4
130
lib/java/src/org/apache/thrift/transport/TFileProcessor.java
Normal file
130
lib/java/src/org/apache/thrift/transport/TFileProcessor.java
Normal file
@ -0,0 +1,130 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.thrift.transport;
|
||||
|
||||
import org.apache.thrift.TProcessor;
|
||||
import org.apache.thrift.TException;
|
||||
import org.apache.thrift.protocol.TProtocol;
|
||||
import org.apache.thrift.protocol.TProtocolFactory;
|
||||
|
||||
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.RandomAccessFile;
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.BufferedOutputStream;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* FileProcessor: helps in processing files generated by TFileTransport.
|
||||
* Port of original cpp implementation
|
||||
*
|
||||
* @author Joydeep Sen Sarma <jssarma@facebook.com>
|
||||
*/
|
||||
public class TFileProcessor {
|
||||
|
||||
private TProcessor processor_;
|
||||
private TProtocolFactory inputProtocolFactory_;
|
||||
private TProtocolFactory outputProtocolFactory_;
|
||||
private TFileTransport inputTransport_;
|
||||
private TTransport outputTransport_;
|
||||
|
||||
public TFileProcessor(TProcessor processor, TProtocolFactory protocolFactory,
|
||||
TFileTransport inputTransport,
|
||||
TTransport outputTransport) {
|
||||
processor_ = processor;
|
||||
inputProtocolFactory_ = outputProtocolFactory_ = protocolFactory;
|
||||
inputTransport_ = inputTransport;
|
||||
outputTransport_ = outputTransport;
|
||||
}
|
||||
|
||||
public TFileProcessor(TProcessor processor,
|
||||
TProtocolFactory inputProtocolFactory,
|
||||
TProtocolFactory outputProtocolFactory,
|
||||
TFileTransport inputTransport,
|
||||
TTransport outputTransport) {
|
||||
processor_ = processor;
|
||||
inputProtocolFactory_ = inputProtocolFactory;
|
||||
outputProtocolFactory_ = outputProtocolFactory;
|
||||
inputTransport_ = inputTransport;
|
||||
outputTransport_ = outputTransport;
|
||||
}
|
||||
|
||||
private void processUntil(int lastChunk) throws TException {
|
||||
TProtocol ip = inputProtocolFactory_.getProtocol(inputTransport_);
|
||||
TProtocol op = outputProtocolFactory_.getProtocol(outputTransport_);
|
||||
int curChunk = inputTransport_.getCurChunk();
|
||||
|
||||
try {
|
||||
while (lastChunk >= curChunk) {
|
||||
processor_.process(ip, op);
|
||||
int newChunk = inputTransport_.getCurChunk();
|
||||
curChunk = newChunk;
|
||||
}
|
||||
} catch (TTransportException e) {
|
||||
// if we are processing the last chunk - we could have just hit EOF
|
||||
// on EOF - trap the error and stop processing.
|
||||
if(e.getType() != TTransportException.END_OF_FILE)
|
||||
throw e;
|
||||
else {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Process from start to last chunk both inclusive where chunks begin from 0
|
||||
|
||||
* @param startChunkNum first chunk to be processed
|
||||
* @param lastChunkNum last chunk to be processed
|
||||
*/
|
||||
public void processChunk(int startChunkNum, int endChunkNum) throws TException {
|
||||
int numChunks = inputTransport_.getNumChunks();
|
||||
if(endChunkNum < 0)
|
||||
endChunkNum += numChunks;
|
||||
|
||||
if(startChunkNum < 0)
|
||||
startChunkNum += numChunks;
|
||||
|
||||
if(endChunkNum < startChunkNum)
|
||||
throw new TException("endChunkNum " + endChunkNum + " is less than " + startChunkNum);
|
||||
|
||||
inputTransport_.seekToChunk(startChunkNum);
|
||||
processUntil(endChunkNum);
|
||||
}
|
||||
|
||||
/**
|
||||
* Process a single chunk
|
||||
*
|
||||
* @param chunkNum chunk to be processed
|
||||
*/
|
||||
public void processChunk(int chunkNum) throws TException {
|
||||
processChunk(chunkNum, chunkNum);
|
||||
}
|
||||
|
||||
/**
|
||||
* Process a current chunk
|
||||
*/
|
||||
public void processChunk() throws TException {
|
||||
processChunk(inputTransport_.getCurChunk());
|
||||
}
|
||||
}
|
628
lib/java/src/org/apache/thrift/transport/TFileTransport.java
Normal file
628
lib/java/src/org/apache/thrift/transport/TFileTransport.java
Normal file
@ -0,0 +1,628 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.thrift.transport;
|
||||
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.BufferedOutputStream;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.Random;
|
||||
|
||||
/**
|
||||
* FileTransport implementation of the TTransport interface.
|
||||
* Currently this is a straightforward port of the cpp implementation
|
||||
*
|
||||
* It may make better sense to provide a basic stream access on top of the framed file format
|
||||
* The FileTransport can then be a user of this framed file format with some additional logic
|
||||
* for chunking.
|
||||
*
|
||||
* @author Joydeep Sen Sarma <jssarma@facebook.com>
|
||||
*/
|
||||
public class TFileTransport extends TTransport {
|
||||
|
||||
public static class truncableBufferedInputStream extends BufferedInputStream {
|
||||
public void trunc() {
|
||||
pos = count = 0;
|
||||
}
|
||||
public truncableBufferedInputStream(InputStream in) {
|
||||
super(in);
|
||||
}
|
||||
public truncableBufferedInputStream(InputStream in, int size) {
|
||||
super(in, size);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public static class Event {
|
||||
private byte[] buf_;
|
||||
private int nread_;
|
||||
private int navailable_;
|
||||
|
||||
/**
|
||||
* Initialize an event. Initially, it has no valid contents
|
||||
*
|
||||
* @param buf byte array buffer to store event
|
||||
*/
|
||||
public Event(byte[] buf) {
|
||||
buf_ = buf;
|
||||
nread_ = navailable_ = 0;
|
||||
}
|
||||
|
||||
public byte[] getBuf() { return buf_;}
|
||||
public int getSize() { return buf_.length; }
|
||||
|
||||
|
||||
public void setAvailable(int sz) { nread_ = 0; navailable_=sz;}
|
||||
public int getRemaining() { return (navailable_ - nread_); }
|
||||
|
||||
public int emit(byte[] buf, int offset, int ndesired) {
|
||||
if((ndesired == 0) || (ndesired > getRemaining()))
|
||||
ndesired = getRemaining();
|
||||
|
||||
if(ndesired <= 0)
|
||||
return (ndesired);
|
||||
|
||||
System.arraycopy(buf_, nread_, buf, offset, ndesired);
|
||||
nread_ += ndesired;
|
||||
|
||||
return(ndesired);
|
||||
}
|
||||
};
|
||||
|
||||
public static class chunkState {
|
||||
/**
|
||||
* Chunk Size. Must be same across all implementations
|
||||
*/
|
||||
public static final int DEFAULT_CHUNK_SIZE = 16 * 1024 * 1024;
|
||||
|
||||
private int chunk_size_ = DEFAULT_CHUNK_SIZE;
|
||||
private long offset_ = 0;
|
||||
|
||||
public chunkState() {}
|
||||
public chunkState(int chunk_size) { chunk_size_ = chunk_size; }
|
||||
|
||||
public void skip(int size) {offset_ += size; }
|
||||
public void seek(long offset) {offset_ = offset;}
|
||||
|
||||
public int getChunkSize() { return chunk_size_;}
|
||||
public int getChunkNum() { return ((int)(offset_/chunk_size_));}
|
||||
public int getRemaining() { return (chunk_size_ - ((int)(offset_ % chunk_size_)));}
|
||||
public long getOffset() { return (offset_);}
|
||||
}
|
||||
|
||||
public static enum tailPolicy {
|
||||
|
||||
NOWAIT(0, 0),
|
||||
WAIT_FOREVER(500, -1);
|
||||
|
||||
/**
|
||||
* Time in milliseconds to sleep before next read
|
||||
* If 0, no sleep
|
||||
*/
|
||||
public final int timeout_;
|
||||
|
||||
/**
|
||||
* Number of retries before giving up
|
||||
* if 0, no retries
|
||||
* if -1, retry forever
|
||||
*/
|
||||
public final int retries_;
|
||||
|
||||
/**
|
||||
* ctor for policy
|
||||
*
|
||||
* @param timeout sleep time for this particular policy
|
||||
* @param retries number of retries
|
||||
*/
|
||||
|
||||
tailPolicy(int timeout, int retries) {
|
||||
timeout_ = timeout;
|
||||
retries_ = retries;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Current tailing policy
|
||||
*/
|
||||
tailPolicy currentPolicy_ = tailPolicy.NOWAIT;
|
||||
|
||||
|
||||
/**
|
||||
* Underlying file being read
|
||||
*/
|
||||
protected TSeekableFile inputFile_ = null;
|
||||
|
||||
/**
|
||||
* Underlying outputStream
|
||||
*/
|
||||
protected OutputStream outputStream_ = null;
|
||||
|
||||
|
||||
/**
|
||||
* Event currently read in
|
||||
*/
|
||||
Event currentEvent_ = null;
|
||||
|
||||
/**
|
||||
* InputStream currently being used for reading
|
||||
*/
|
||||
InputStream inputStream_ = null;
|
||||
|
||||
/**
|
||||
* current Chunk state
|
||||
*/
|
||||
chunkState cs = null;
|
||||
|
||||
/**
|
||||
* Read timeout
|
||||
*/
|
||||
private int readTimeout_ = 0;
|
||||
|
||||
/**
|
||||
* is read only?
|
||||
*/
|
||||
private boolean readOnly_ = false;
|
||||
|
||||
/**
|
||||
* Get File Tailing Policy
|
||||
*
|
||||
* @return current read policy
|
||||
*/
|
||||
public tailPolicy getTailPolicy() {
|
||||
return (currentPolicy_);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set file Tailing Policy
|
||||
*
|
||||
* @param policy New policy to set
|
||||
* @return Old policy
|
||||
*/
|
||||
public tailPolicy setTailPolicy(tailPolicy policy) {
|
||||
tailPolicy old = currentPolicy_;
|
||||
currentPolicy_ = policy;
|
||||
return (old);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Initialize read input stream
|
||||
*
|
||||
* @return input stream to read from file
|
||||
*/
|
||||
private InputStream createInputStream() throws TTransportException {
|
||||
InputStream is;
|
||||
try {
|
||||
if(inputStream_ != null) {
|
||||
((truncableBufferedInputStream)inputStream_).trunc();
|
||||
is = inputStream_;
|
||||
} else {
|
||||
is = new truncableBufferedInputStream(inputFile_.getInputStream());
|
||||
}
|
||||
} catch (IOException iox) {
|
||||
System.err.println("createInputStream: "+iox.getMessage());
|
||||
throw new TTransportException(iox.getMessage(), iox);
|
||||
}
|
||||
return(is);
|
||||
}
|
||||
|
||||
/**
|
||||
* Read (potentially tailing) an input stream
|
||||
*
|
||||
* @param is InputStream to read from
|
||||
* @param buf Buffer to read into
|
||||
* @param off Offset in buffer to read into
|
||||
* @param len Number of bytes to read
|
||||
* @param tp policy to use if we hit EOF
|
||||
*
|
||||
* @return number of bytes read
|
||||
*/
|
||||
private int tailRead(InputStream is, byte[] buf,
|
||||
int off, int len, tailPolicy tp) throws TTransportException {
|
||||
int orig_len = len;
|
||||
try {
|
||||
int retries = 0;
|
||||
while(len > 0) {
|
||||
int cnt = is.read(buf, off, len);
|
||||
if(cnt > 0) {
|
||||
off += cnt;
|
||||
len -= cnt;
|
||||
retries = 0;
|
||||
cs.skip(cnt); // remember that we read so many bytes
|
||||
} else if (cnt == -1) {
|
||||
// EOF
|
||||
retries++;
|
||||
|
||||
if((tp.retries_ != -1) && tp.retries_ < retries)
|
||||
return (orig_len - len);
|
||||
|
||||
if(tp.timeout_ > 0) {
|
||||
try {Thread.sleep(tp.timeout_);} catch(InterruptedException e) {}
|
||||
}
|
||||
} else {
|
||||
// either non-zero or -1 is what the contract says!
|
||||
throw new
|
||||
TTransportException("Unexpected return from InputStream.read = "
|
||||
+ cnt);
|
||||
}
|
||||
}
|
||||
} catch (IOException iox) {
|
||||
throw new TTransportException(iox.getMessage(), iox);
|
||||
}
|
||||
|
||||
return(orig_len - len);
|
||||
}
|
||||
|
||||
/**
|
||||
* Event is corrupted. Do recovery
|
||||
*
|
||||
* @return true if recovery could be performed and we can read more data
|
||||
* false is returned only when nothing more can be read
|
||||
*/
|
||||
private boolean performRecovery() throws TTransportException {
|
||||
int numChunks = getNumChunks();
|
||||
int curChunk = cs.getChunkNum();
|
||||
|
||||
if(curChunk >= (numChunks-1)) {
|
||||
return false;
|
||||
}
|
||||
seekToChunk(curChunk+1);
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Read event from underlying file
|
||||
*
|
||||
* @return true if event could be read, false otherwise (on EOF)
|
||||
*/
|
||||
private boolean readEvent() throws TTransportException {
|
||||
byte[] ebytes = new byte[4];
|
||||
int esize;
|
||||
int nread;
|
||||
int nrequested;
|
||||
|
||||
retry:
|
||||
do {
|
||||
// corner case. read to end of chunk
|
||||
nrequested = cs.getRemaining();
|
||||
if(nrequested < 4) {
|
||||
nread = tailRead(inputStream_, ebytes, 0, nrequested, currentPolicy_);
|
||||
if(nread != nrequested) {
|
||||
return(false);
|
||||
}
|
||||
}
|
||||
|
||||
// assuming serialized on little endian machine
|
||||
nread = tailRead(inputStream_, ebytes, 0, 4, currentPolicy_);
|
||||
if(nread != 4) {
|
||||
return(false);
|
||||
}
|
||||
|
||||
esize=0;
|
||||
for(int i=3; i>=0; i--) {
|
||||
int val = (0x000000ff & (int)ebytes[i]);
|
||||
esize |= (val << (i*8));
|
||||
}
|
||||
|
||||
// check if event is corrupted and do recovery as required
|
||||
if(esize > cs.getRemaining()) {
|
||||
throw new TTransportException("FileTransport error: bad event size");
|
||||
/*
|
||||
if(performRecovery()) {
|
||||
esize=0;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
*/
|
||||
}
|
||||
} while (esize == 0);
|
||||
|
||||
// reset existing event or get a larger one
|
||||
if(currentEvent_.getSize() < esize)
|
||||
currentEvent_ = new Event(new byte [esize]);
|
||||
|
||||
// populate the event
|
||||
byte[] buf = currentEvent_.getBuf();
|
||||
nread = tailRead(inputStream_, buf, 0, esize, currentPolicy_);
|
||||
if(nread != esize) {
|
||||
return(false);
|
||||
}
|
||||
currentEvent_.setAvailable(esize);
|
||||
return(true);
|
||||
}
|
||||
|
||||
/**
|
||||
* open if both input/output open unless readonly
|
||||
*
|
||||
* @return true
|
||||
*/
|
||||
public boolean isOpen() {
|
||||
return ((inputStream_ != null) && (readOnly_ || (outputStream_ != null)));
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Diverging from the cpp model and sticking to the TSocket model
|
||||
* Files are not opened in ctor - but in explicit open call
|
||||
*/
|
||||
public void open() throws TTransportException {
|
||||
if (isOpen())
|
||||
throw new TTransportException(TTransportException.ALREADY_OPEN);
|
||||
|
||||
try {
|
||||
inputStream_ = createInputStream();
|
||||
cs = new chunkState();
|
||||
currentEvent_ = new Event(new byte [256]);
|
||||
|
||||
if(!readOnly_)
|
||||
outputStream_ = new BufferedOutputStream(inputFile_.getOutputStream(), 8192);
|
||||
} catch (IOException iox) {
|
||||
throw new TTransportException(TTransportException.NOT_OPEN, iox);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Closes the transport.
|
||||
*/
|
||||
public void close() {
|
||||
if (inputFile_ != null) {
|
||||
try {
|
||||
inputFile_.close();
|
||||
} catch (IOException iox) {
|
||||
System.err.println("WARNING: Error closing input file: " +
|
||||
iox.getMessage());
|
||||
}
|
||||
inputFile_ = null;
|
||||
}
|
||||
if (outputStream_ != null) {
|
||||
try {
|
||||
outputStream_.close();
|
||||
} catch (IOException iox) {
|
||||
System.err.println("WARNING: Error closing output stream: " +
|
||||
iox.getMessage());
|
||||
}
|
||||
outputStream_ = null;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* File Transport ctor
|
||||
*
|
||||
* @param path File path to read and write from
|
||||
* @param readOnly Whether this is a read-only transport
|
||||
*/
|
||||
public TFileTransport(final String path, boolean readOnly) throws IOException {
|
||||
inputFile_ = new TStandardFile(path);
|
||||
readOnly_ = readOnly;
|
||||
}
|
||||
|
||||
/**
|
||||
* File Transport ctor
|
||||
*
|
||||
* @param inputFile_ open TSeekableFile to read/write from
|
||||
* @param readOnly Whether this is a read-only transport
|
||||
*/
|
||||
public TFileTransport(TSeekableFile inputFile, boolean readOnly) {
|
||||
inputFile_ = inputFile;
|
||||
readOnly_ = readOnly;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Cloned from TTransport.java:readAll(). Only difference is throwing an EOF exception
|
||||
* where one is detected.
|
||||
*/
|
||||
public int readAll(byte[] buf, int off, int len)
|
||||
throws TTransportException {
|
||||
int got = 0;
|
||||
int ret = 0;
|
||||
while (got < len) {
|
||||
ret = read(buf, off+got, len-got);
|
||||
if (ret < 0) {
|
||||
throw new TTransportException("Error in reading from file");
|
||||
}
|
||||
if(ret == 0) {
|
||||
throw new TTransportException(TTransportException.END_OF_FILE,
|
||||
"End of File reached");
|
||||
}
|
||||
got += ret;
|
||||
}
|
||||
return got;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Reads up to len bytes into buffer buf, starting att offset off.
|
||||
*
|
||||
* @param buf Array to read into
|
||||
* @param off Index to start reading at
|
||||
* @param len Maximum number of bytes to read
|
||||
* @return The number of bytes actually read
|
||||
* @throws TTransportException if there was an error reading data
|
||||
*/
|
||||
public int read(byte[] buf, int off, int len) throws TTransportException {
|
||||
if(!isOpen())
|
||||
throw new TTransportException(TTransportException.NOT_OPEN,
|
||||
"Must open before reading");
|
||||
|
||||
if(currentEvent_.getRemaining() == 0) {
|
||||
if(!readEvent())
|
||||
return(0);
|
||||
}
|
||||
|
||||
int nread = currentEvent_.emit(buf, off, len);
|
||||
return nread;
|
||||
}
|
||||
|
||||
public int getNumChunks() throws TTransportException {
|
||||
if(!isOpen())
|
||||
throw new TTransportException(TTransportException.NOT_OPEN,
|
||||
"Must open before getNumChunks");
|
||||
try {
|
||||
long len = inputFile_.length();
|
||||
if(len == 0)
|
||||
return 0;
|
||||
else
|
||||
return (((int)(len/cs.getChunkSize())) + 1);
|
||||
|
||||
} catch (IOException iox) {
|
||||
throw new TTransportException(iox.getMessage(), iox);
|
||||
}
|
||||
}
|
||||
|
||||
public int getCurChunk() throws TTransportException {
|
||||
if(!isOpen())
|
||||
throw new TTransportException(TTransportException.NOT_OPEN,
|
||||
"Must open before getCurChunk");
|
||||
return (cs.getChunkNum());
|
||||
|
||||
}
|
||||
|
||||
|
||||
public void seekToChunk(int chunk) throws TTransportException {
|
||||
if(!isOpen())
|
||||
throw new TTransportException(TTransportException.NOT_OPEN,
|
||||
"Must open before seeking");
|
||||
|
||||
int numChunks = getNumChunks();
|
||||
|
||||
// file is empty, seeking to chunk is pointless
|
||||
if (numChunks == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
// negative indicates reverse seek (from the end)
|
||||
if (chunk < 0) {
|
||||
chunk += numChunks;
|
||||
}
|
||||
|
||||
// too large a value for reverse seek, just seek to beginnin
|
||||
if (chunk < 0) {
|
||||
chunk = 0;
|
||||
}
|
||||
|
||||
long eofOffset=0;
|
||||
boolean seekToEnd = (chunk >= numChunks);
|
||||
if(seekToEnd) {
|
||||
chunk = chunk - 1;
|
||||
try { eofOffset = inputFile_.length(); }
|
||||
catch (IOException iox) {throw new TTransportException(iox.getMessage(),
|
||||
iox);}
|
||||
}
|
||||
|
||||
if(chunk*cs.getChunkSize() != cs.getOffset()) {
|
||||
try { inputFile_.seek((long)chunk*cs.getChunkSize()); }
|
||||
catch (IOException iox) {
|
||||
System.err.println("createInputStream: "+iox.getMessage());
|
||||
throw new TTransportException("Seek to chunk " +
|
||||
chunk + " " +iox.getMessage(), iox);
|
||||
}
|
||||
|
||||
cs.seek((long)chunk*cs.getChunkSize());
|
||||
currentEvent_.setAvailable(0);
|
||||
inputStream_ = createInputStream();
|
||||
}
|
||||
|
||||
if(seekToEnd) {
|
||||
// waiting forever here - otherwise we can hit EOF and end up
|
||||
// having consumed partial data from the data stream.
|
||||
tailPolicy old = setTailPolicy(tailPolicy.WAIT_FOREVER);
|
||||
while(cs.getOffset() < eofOffset) { readEvent(); }
|
||||
currentEvent_.setAvailable(0);
|
||||
setTailPolicy(old);
|
||||
}
|
||||
}
|
||||
|
||||
public void seekToEnd() throws TTransportException {
|
||||
if(!isOpen())
|
||||
throw new TTransportException(TTransportException.NOT_OPEN,
|
||||
"Must open before seeking");
|
||||
seekToChunk(getNumChunks());
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Writes up to len bytes from the buffer.
|
||||
*
|
||||
* @param buf The output data buffer
|
||||
* @param off The offset to start writing from
|
||||
* @param len The number of bytes to write
|
||||
* @throws TTransportException if there was an error writing data
|
||||
*/
|
||||
public void write(byte[] buf, int off, int len) throws TTransportException {
|
||||
throw new TTransportException("Not Supported");
|
||||
}
|
||||
|
||||
/**
|
||||
* Flush any pending data out of a transport buffer.
|
||||
*
|
||||
* @throws TTransportException if there was an error writing out data.
|
||||
*/
|
||||
public void flush() throws TTransportException {
|
||||
throw new TTransportException("Not Supported");
|
||||
}
|
||||
|
||||
/**
|
||||
* test program
|
||||
*
|
||||
*/
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
int num_chunks = 10;
|
||||
|
||||
if((args.length < 1) || args[0].equals("--help")
|
||||
|| args[0].equals("-h") || args[0].equals("-?")) {
|
||||
printUsage();
|
||||
}
|
||||
|
||||
if(args.length > 1) {
|
||||
try {
|
||||
num_chunks = Integer.parseInt(args[1]);
|
||||
} catch (Exception e) {
|
||||
System.err.println("Cannot parse " + args[1]);
|
||||
printUsage();
|
||||
}
|
||||
}
|
||||
|
||||
TFileTransport t = new TFileTransport(args[0], true);
|
||||
t.open();
|
||||
System.out.println("NumChunks="+t.getNumChunks());
|
||||
|
||||
Random r = new Random();
|
||||
for(int j=0; j<num_chunks; j++) {
|
||||
byte[] buf = new byte[4096];
|
||||
int cnum = r.nextInt(t.getNumChunks()-1);
|
||||
System.out.println("Reading chunk "+cnum);
|
||||
t.seekToChunk(cnum);
|
||||
for(int i=0; i<4096; i++) {
|
||||
t.read(buf, 0, 4096);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static void printUsage() {
|
||||
System.err.println("Usage: TFileTransport <filename> [num_chunks]");
|
||||
System.err.println(" (Opens and reads num_chunks chunks from file randomly)");
|
||||
System.exit(1);
|
||||
}
|
||||
|
||||
}
|
33
lib/java/src/org/apache/thrift/transport/TSeekableFile.java
Normal file
33
lib/java/src/org/apache/thrift/transport/TSeekableFile.java
Normal file
@ -0,0 +1,33 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.thrift.transport;
|
||||
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.io.IOException;
|
||||
|
||||
public interface TSeekableFile {
|
||||
|
||||
public InputStream getInputStream() throws IOException;
|
||||
public OutputStream getOutputStream() throws IOException;
|
||||
public void close() throws IOException;
|
||||
public long length() throws IOException;
|
||||
public void seek(long pos) throws IOException;
|
||||
}
|
60
lib/java/src/org/apache/thrift/transport/TStandardFile.java
Normal file
60
lib/java/src/org/apache/thrift/transport/TStandardFile.java
Normal file
@ -0,0 +1,60 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.thrift.transport;
|
||||
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.RandomAccessFile;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileOutputStream;
|
||||
|
||||
public class TStandardFile implements TSeekableFile {
|
||||
|
||||
protected String path_ = null;
|
||||
protected RandomAccessFile inputFile_ = null;
|
||||
|
||||
public TStandardFile(String path) throws IOException {
|
||||
path_ = path;
|
||||
inputFile_ = new RandomAccessFile(path_, "r");
|
||||
}
|
||||
|
||||
public InputStream getInputStream() throws IOException {
|
||||
return new FileInputStream(inputFile_.getFD());
|
||||
}
|
||||
|
||||
public OutputStream getOutputStream() throws IOException {
|
||||
return new FileOutputStream(path_);
|
||||
}
|
||||
|
||||
public void close() throws IOException {
|
||||
if(inputFile_ != null) {
|
||||
inputFile_.close();
|
||||
}
|
||||
}
|
||||
|
||||
public long length() throws IOException {
|
||||
return inputFile_.length();
|
||||
}
|
||||
|
||||
public void seek(long pos) throws IOException {
|
||||
inputFile_.seek(pos);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user