|
|
|
@ -293,13 +293,11 @@ func (self *sequentialReader) ReadAt(target []byte, off int64) (n int, err error |
|
|
|
|
self.lock.Lock() |
|
|
|
|
// assert self.pos <= off
|
|
|
|
|
if self.pos > off { |
|
|
|
|
glog.V(logger.Error).Infof("non-sequential read attempted from sequentialReader; %d > %d", |
|
|
|
|
self.pos, off) |
|
|
|
|
glog.V(logger.Error).Infof("non-sequential read attempted from sequentialReader; %d > %d", self.pos, off) |
|
|
|
|
panic("Non-sequential read attempt") |
|
|
|
|
} |
|
|
|
|
if self.pos != off { |
|
|
|
|
glog.V(logger.Debug).Infof("deferred read in POST at position %d, offset %d.", |
|
|
|
|
self.pos, off) |
|
|
|
|
glog.V(logger.Debug).Infof("deferred read in POST at position %d, offset %d.", self.pos, off) |
|
|
|
|
wait := make(chan bool) |
|
|
|
|
self.ahead[off] = wait |
|
|
|
|
self.lock.Unlock() |
|
|
|
@ -315,8 +313,7 @@ func (self *sequentialReader) ReadAt(target []byte, off int64) (n int, err error |
|
|
|
|
for localPos < len(target) { |
|
|
|
|
n, err = self.reader.Read(target[localPos:]) |
|
|
|
|
localPos += n |
|
|
|
|
glog.V(logger.Debug).Infof("Read %d bytes into buffer size %d from POST, error %v.", |
|
|
|
|
n, len(target), err) |
|
|
|
|
glog.V(logger.Debug).Infof("Read %d bytes into buffer size %d from POST, error %v.", n, len(target), err) |
|
|
|
|
if err != nil { |
|
|
|
|
glog.V(logger.Debug).Infof("POST stream's reading terminated with %v.", err) |
|
|
|
|
for i := range self.ahead { |
|
|
|
@ -330,8 +327,7 @@ func (self *sequentialReader) ReadAt(target []byte, off int64) (n int, err error |
|
|
|
|
} |
|
|
|
|
wait := self.ahead[self.pos] |
|
|
|
|
if wait != nil { |
|
|
|
|
glog.V(logger.Debug).Infof("deferred read in POST at position %d triggered.", |
|
|
|
|
self.pos) |
|
|
|
|
glog.V(logger.Debug).Infof("deferred read in POST at position %d triggered.", self.pos) |
|
|
|
|
delete(self.ahead, self.pos) |
|
|
|
|
close(wait) |
|
|
|
|
} |
|
|
|
|