-- Error recovery code for thrift logfile

Summary:
- perform some basic corruption checks:
   1) Event larger than chunk
   2) Event larger than specified max
   3) Event crossing chunk boundary etc.
- If error encountered, then try to perform some recovery

Reviewed By: Slee

Test Plan: Going to test now...need to check in because of compile issues

Notes:
- These checks take care of the case when there is a dirty read from the filesystem (which
  we have encountered with the netapps). The recovery involves trying to perform the read
  again from ths FS and if that fails skipping the chunk altogether.
  Keep in mind that this might only be useful for idempotent systems (e.g. search redolog).


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664943 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Aditya Agarwal 2007-01-24 23:39:10 +00:00
parent c425780847
commit 68f08ee5f5
2 changed files with 120 additions and 34 deletions

View File

@ -424,7 +424,7 @@ bool TFileTransport::readEvent() {
}
while (1) {
// check if there is anything in the read buffer
// read from the file if read buffer is exhausted
if (readState_.bufferPtr_ == readState_.bufferLen_) {
// advance the offset pointer
offset_ += readState_.bufferLen_;
@ -480,7 +480,6 @@ bool TFileTransport::readEvent() {
readState_.eventSizeBuff_[readState_.eventSizeBuffPos_++] =
readBuff_[readState_.bufferPtr_++];
bool eventCorruption = false;
if (readState_.eventSizeBuffPos_ == 4) {
// 0 length event indicates padding
if (*((uint32_t *)(readState_.eventSizeBuff_)) == 0) {
@ -496,18 +495,12 @@ bool TFileTransport::readEvent() {
readState_.event_ = new eventInfo();
readState_.event_->eventSize_ = *((uint32_t *)(readState_.eventSizeBuff_));
// TODO
// make sure event is valid, an error is triggered if:
// 1. Event size is larger than user-speficied max-event size
// 2. Event size is larger than chunk size
// 3. size indicates that event crosses chunk boundary
}
if (eventCorruption) {
// perform some kickass recovery
// check if the event is corrupted and perform recovery if required
if (isEventCorrupted()) {
performRecovery();
// start from the top
break;
}
}
} else {
if (!readState_.event_->eventBuff_) {
@ -527,11 +520,6 @@ bool TFileTransport::readEvent() {
readState_.event_->eventBuffPos_ += reclaimBuffer;
readState_.bufferPtr_ += reclaimBuffer;
// if (reclaimBuffer > 0) {
// T_DEBUG_L(0, "eventBuffPost: %u", readState_.event_->eventBuffPos_);
// T_DEBUG_L(0, "eventSize: %u", readState_.event_->eventSize_);
// }
// check if the event has been read in full
if (readState_.event_->eventBuffPos_ == readState_.event_->eventSize_) {
// set the completed event to the current event
@ -542,16 +530,77 @@ bool TFileTransport::readEvent() {
readState_.resetState(readState_.bufferPtr_);
// exit criteria
// T_DEBUG_L(0, "Finished one event");
return true;
}
}
}
}
}
bool TFileTransport::isEventCorrupted() {
// an error is triggered if:
if ( (maxEventSize_ > 0) && (readState_.event_->eventSize_ > maxEventSize_)) {
// 1. Event size is larger than user-speficied max-event size
T_ERROR("Read corrupt event. Event size(%u) greater than max event size (%u)",
readState_.event_->eventSize_, maxEventSize_);
return true;
} else if (readState_.event_->eventSize_ > chunkSize_) {
// 2. Event size is larger than chunk size
T_ERROR("Read corrupt event. Event size(%u) greater than chunk size (%u)",
readState_.event_->eventSize_, chunkSize_);
return true;
} else if( ((offset_ + readState_.bufferPtr_ - 4)/chunkSize_) !=
((offset_ + readState_.bufferPtr_ + readState_.event_->eventSize_ - 1)/chunkSize_) ) {
// 3. size indicates that event crosses chunk boundary
T_ERROR("Read corrupt event. Event crosses chunk boundary. Event size:%u Offset:%ld",
readState_.event_->eventSize_, offset_ + readState_.bufferPtr_ + 4);
return true;
}
return false;
}
void TFileTransport::performRecovery() {
// perform some kickass recovery
uint32_t curChunk = getCurChunk();
if (lastBadChunk == curChunk) {
numCorruptedEventsinChunk++;
} else {
lastBadChunk = curChunk;
numCorruptedEventsinChunk = 1;
}
if (numCorruptedEventsinChunk < maxCorruptedEvents_) {
// maybe there was an error in reading the file from disk
// seek to the beginning of chunk and try again
seekToChunk(curChunk);
} else {
// just skip ahead to the next chunk if we not already at the last chunk
if (curChunk != (getNumChunks() - 1)) {
seekToChunk(curChunk + 1);
} else if (readTimeout_ == TAIL_READ_TIMEOUT) {
// if tailing the file, wait until there is enough data to start
// the next chunk
while(curChunk == (getNumChunks() - 1)) {
usleep(DEFAULT_CORRUPTED_SLEEP_TIME_US);
}
seekToChunk(curChunk + 1);
} else {
// pretty hosed at this stage, rewind the file back to the last successful
// point and punt on the error
readState_.resetState(readState_.lastDispatchPtr_);
char errorMsg[1024];
sprintf(errorMsg, "TFileTransport: log file corrupted at offset: %lu",
offset_ + readState_.lastDispatchPtr_);
perror(errorMsg);
throw TTransportException(errorMsg);
}
}
}
void TFileTransport::seekToChunk(int32_t chunk) {
if (fd_ <= 0) {
throw TTransportException("File not open");
@ -656,7 +705,22 @@ uint32_t TFileTransport::getCurrentTime() {
TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,
shared_ptr<TProtocolFactory> protocolFactory,
shared_ptr<TFileTransport> inputTransport):
processor_(processor), protocolFactory_(protocolFactory),
processor_(processor),
inputProtocolFactory_(protocolFactory),
outputProtocolFactory_(protocolFactory),
inputTransport_(inputTransport) {
// default the output transport to a null transport (common case)
outputTransport_ = shared_ptr<TNullTransport>(new TNullTransport());
}
TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,
shared_ptr<TProtocolFactory> inputProtocolFactory,
shared_ptr<TProtocolFactory> outputProtocolFactory,
shared_ptr<TFileTransport> inputTransport):
processor_(processor),
inputProtocolFactory_(inputProtocolFactory),
outputProtocolFactory_(outputProtocolFactory),
inputTransport_(inputTransport) {
// default the output transport to a null transport (common case)
@ -667,13 +731,15 @@ TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,
shared_ptr<TProtocolFactory> protocolFactory,
shared_ptr<TFileTransport> inputTransport,
shared_ptr<TTransport> outputTransport):
processor_(processor), protocolFactory_(protocolFactory),
inputTransport_(inputTransport), outputTransport_(outputTransport) {
};
processor_(processor),
inputProtocolFactory_(protocolFactory),
outputProtocolFactory_(protocolFactory),
inputTransport_(inputTransport),
outputTransport_(outputTransport) {};
void TFileProcessor::process(uint32_t numEvents, bool tail) {
pair<shared_ptr<TProtocol>,shared_ptr<TProtocol> > iop;
iop = protocolFactory_->getIOProtocols(inputTransport_, outputTransport_);
shared_ptr<TProtocol> inputProtocol = inputProtocolFactory_->getProtocol(inputTransport_);
shared_ptr<TProtocol> outputProtocol = outputProtocolFactory_->getProtocol(outputTransport_);
// set the read timeout to 0 if tailing is required
int32_t oldReadTimeout = inputTransport_->getReadTimeout();
@ -687,7 +753,7 @@ void TFileProcessor::process(uint32_t numEvents, bool tail) {
// bad form to use exceptions for flow control but there is really
// no other way around it
try {
processor_->process(iop.first, iop.second);
processor_->process(inputProtocol, outputProtocol);
numProcessed++;
if ( (numEvents > 0) && (numProcessed == numEvents)) {
return;
@ -710,8 +776,8 @@ void TFileProcessor::process(uint32_t numEvents, bool tail) {
}
void TFileProcessor::processChunk() {
pair<shared_ptr<TProtocol>,shared_ptr<TProtocol> > iop;
iop = protocolFactory_->getIOProtocols(inputTransport_, outputTransport_);
shared_ptr<TProtocol> inputProtocol = inputProtocolFactory_->getProtocol(inputTransport_);
shared_ptr<TProtocol> outputProtocol = outputProtocolFactory_->getProtocol(outputTransport_);
uint32_t curChunk = inputTransport_->getCurChunk();
@ -719,7 +785,7 @@ void TFileProcessor::processChunk() {
// bad form to use exceptions for flow control but there is really
// no other way around it
try {
processor_->process(iop.first, iop.second);
processor_->process(inputProtocol, outputProtocol);
if (curChunk != inputTransport_->getCurChunk()) {
break;
}

View File

@ -119,6 +119,8 @@ class TFileTransport : public TTransport {
return readBuffSize_;
}
static const int32_t TAIL_READ_TIMEOUT = -1;
static const int32_t NO_TAIL_READ_TIMEOUT = 0;
void setReadTimeout(int32_t readTimeout) {
readTimeout_ = readTimeout;
}
@ -205,6 +207,10 @@ class TFileTransport : public TTransport {
// helper functions for reading from a file
bool readEvent();
// event corruption-related functions
bool isEventCorrupted();
void performRecovery();
// Utility functions
void openLogFile();
uint32_t getCurrentTime();
@ -252,6 +258,10 @@ class TFileTransport : public TTransport {
// sleep duration when EOF is hit
uint32_t eofSleepTime_;
static const uint32_t DEFAULT_EOF_SLEEP_TIME_US = 500 * 1000;
// sleep duration when a corrupted event is encountered
uint32_t corruptedEventSleepTime_;
static const uint32_t DEFAULT_CORRUPTED_SLEEP_TIME_US = 1 * 1000 * 1000;
// writer thread id
pthread_t writerThreadId_;
@ -279,6 +289,10 @@ class TFileTransport : public TTransport {
// Offset within the file
off_t offset_;
// event corruption information
uint32_t lastBadChunk;
uint32_t numCorruptedEventsinChunk;
};
// Exception thrown when EOF is hit
@ -303,6 +317,11 @@ class TFileProcessor {
shared_ptr<TProtocolFactory> protocolFactory,
shared_ptr<TFileTransport> inputTransport);
TFileProcessor(shared_ptr<TProcessor> processor,
shared_ptr<TProtocolFactory> inputProtocolFactory,
shared_ptr<TProtocolFactory> outputProtocolFactory,
shared_ptr<TFileTransport> inputTransport);
/**
* Constructor
*
@ -314,7 +333,7 @@ class TFileProcessor {
TFileProcessor(shared_ptr<TProcessor> processor,
shared_ptr<TProtocolFactory> protocolFactory,
shared_ptr<TFileTransport> inputTransport,
shared_ptr<TTransport> outputTransport);
shared_ptr<TTransport> outputTransport);
/**
* processes events from the file
@ -332,7 +351,8 @@ class TFileProcessor {
private:
shared_ptr<TProcessor> processor_;
shared_ptr<TProtocolFactory> protocolFactory_;
shared_ptr<TProtocolFactory> inputProtocolFactory_;
shared_ptr<TProtocolFactory> outputProtocolFactory_;
shared_ptr<TFileTransport> inputTransport_;
shared_ptr<TTransport> outputTransport_;
};