Skip to content

Commit

Permalink
perform checks when readFile == writeFile
Browse files Browse the repository at this point in the history
  • Loading branch information
mreiferson committed May 30, 2023
1 parent 637281a commit c2aa60b
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 7 deletions.
5 changes: 3 additions & 2 deletions diskqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ func (d *diskQueue) readOne() ([]byte, error) {
// we only consider rotating if we're reading a "complete" file
// and since we cannot know the size at which it was rotated, we
// rely on maxBytesPerFileRead rather than maxBytesPerFile
if d.readFileNum < d.writeFileNum && d.nextReadPos >= d.maxBytesPerFileRead {
if d.readFileNum <= d.writeFileNum && d.nextReadPos >= d.maxBytesPerFileRead {
if d.readFile != nil {
d.readFile.Close()
d.readFile = nil
Expand Down Expand Up @@ -394,6 +394,7 @@ func (d *diskQueue) writeOne(data []byte) error {
d.writeFile = nil
}
}

if d.writeFile == nil {
curFileName := d.fileName(d.writeFileNum)
d.writeFile, err = os.OpenFile(curFileName, os.O_RDWR|os.O_CREATE, 0600)
Expand Down Expand Up @@ -675,7 +676,7 @@ func (d *diskQueue) ioLoop() {
count = 0
}

if (d.readFileNum < d.writeFileNum) || (d.readPos < d.writePos) {
if d.readFileNum < d.writeFileNum || (d.readFileNum == d.writeFileNum && d.readPos < d.writePos) {
if d.nextReadPos == d.readPos {
dataRead, err = d.readOne()
if err != nil {
Expand Down
16 changes: 11 additions & 5 deletions diskqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,17 +310,22 @@ func TestDiskQueueCorruption(t *testing.T) {
dqFn := dq.(*diskQueue).fileName(1)
os.Truncate(dqFn, 400) // 3 valid messages, 5 corrupted

for i := 0; i < 19; i++ { // 1 message leftover in 4th file
Equal(t, msg, <-dq.ReadChan())
}

// corrupt the 4th (current) file
dqFn = dq.(*diskQueue).fileName(3)
os.Truncate(dqFn, 100)

for i := 0; i < 18; i++ {
Equal(t, msg, <-dq.ReadChan())
}
Equal(t, int64(7), dq.Depth())

Equal(t, msg, <-dq.ReadChan())
Equal(t, int64(0), dq.Depth())

dq.Put(msg) // in 5th file

Equal(t, msg, <-dq.ReadChan())
Equal(t, int64(0), dq.Depth())

// write a corrupt (len 0) message at the 5th (current) file
dq.(*diskQueue).writeFile.Write([]byte{0, 0, 0, 0})
Expand All @@ -333,14 +338,15 @@ func TestDiskQueueCorruption(t *testing.T) {

dq.Put(msg)
dq.Put(msg)

// corrupt the last file
dqFn = dq.(*diskQueue).fileName(5)
os.Truncate(dqFn, 100)

Equal(t, int64(2), dq.Depth())

// return one message and try reading again from corrupted file
<-dq.ReadChan()
Equal(t, msg, <-dq.ReadChan())

// give diskqueue time to handle read error
time.Sleep(50 * time.Millisecond)
Expand Down

0 comments on commit c2aa60b

Please sign in to comment.