Skip to content

Commit

Permalink
fix: pull in fixes from nsqio#41
Browse files Browse the repository at this point in the history
  • Loading branch information
jonasfreyr committed Oct 13, 2023
1 parent cc41549 commit 5a894f7
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 7 deletions.
5 changes: 3 additions & 2 deletions diskqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ func (d *diskQueue) readOne() ([]byte, error) {
// we only consider rotating if we're reading a "complete" file
// and since we cannot know the size at which it was rotated, we
// rely on maxBytesPerFileRead rather than maxBytesPerFile
if d.readFileNum < d.writeFileNum && d.nextReadPos >= d.maxBytesPerFileRead {
if d.readFileNum <= d.writeFileNum && d.nextReadPos >= d.maxBytesPerFileRead {
if d.readFile != nil {
d.readFile.Close()
d.readFile = nil
Expand Down Expand Up @@ -394,6 +394,7 @@ func (d *diskQueue) writeOne(data []byte) error {
d.writeFile = nil
}
}

if d.writeFile == nil {
curFileName := d.fileName(d.writeFileNum)
d.writeFile, err = os.OpenFile(curFileName, os.O_RDWR|os.O_CREATE, 0600)
Expand Down Expand Up @@ -675,7 +676,7 @@ func (d *diskQueue) ioLoop() {
count = 0
}

if (d.readFileNum < d.writeFileNum) || (d.readPos < d.writePos) {
if d.readFileNum < d.writeFileNum || (d.readFileNum == d.writeFileNum && d.readPos < d.writePos) {
if d.nextReadPos == d.readPos {
dataRead, err = d.readOne()
if err != nil {
Expand Down
54 changes: 49 additions & 5 deletions diskqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ import (
"bufio"
"bytes"
"fmt"
"io/fs"
"io/ioutil"
"os"
"path"
"path/filepath"
"reflect"
"runtime"
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -308,17 +310,22 @@ func TestDiskQueueCorruption(t *testing.T) {
dqFn := dq.(*diskQueue).fileName(1)
os.Truncate(dqFn, 400) // 3 valid messages, 5 corrupted

for i := 0; i < 19; i++ { // 1 message leftover in 4th file
Equal(t, msg, <-dq.ReadChan())
}

// corrupt the 4th (current) file
dqFn = dq.(*diskQueue).fileName(3)
os.Truncate(dqFn, 100)

for i := 0; i < 18; i++ {
Equal(t, msg, <-dq.ReadChan())
}
Equal(t, int64(7), dq.Depth())

Equal(t, msg, <-dq.ReadChan())
Equal(t, int64(0), dq.Depth())

dq.Put(msg) // in 5th file

Equal(t, msg, <-dq.ReadChan())
Equal(t, int64(0), dq.Depth())

// write a corrupt (len 0) message at the 5th (current) file
dq.(*diskQueue).writeFile.Write([]byte{0, 0, 0, 0})
Expand All @@ -331,14 +338,15 @@ func TestDiskQueueCorruption(t *testing.T) {

dq.Put(msg)
dq.Put(msg)

// corrupt the last file
dqFn = dq.(*diskQueue).fileName(5)
os.Truncate(dqFn, 100)

Equal(t, int64(2), dq.Depth())

// return one message and try reading again from corrupted file
<-dq.ReadChan()
Equal(t, msg, <-dq.ReadChan())

// give diskqueue time to handle read error
time.Sleep(50 * time.Millisecond)
Expand Down Expand Up @@ -773,3 +781,39 @@ func benchmarkDiskQueueGet(size int64, b *testing.B) {
<-dq.ReadChan()
}
}

func TestDiskQueueRollAsync(t *testing.T) {
l := NewTestLogger(t)
dqName := "test_disk_queue_roll" + strconv.Itoa(int(time.Now().Unix()))
tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano()))
if err != nil {
panic(err)
}
defer os.RemoveAll(tmpDir)
msg := []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 0}
ml := int64(len(msg))
dq := New(dqName, tmpDir, 10*(ml+4), int32(ml), 1<<10, 2500, 2*time.Second, l)
defer dq.Close()
NotNil(t, dq)
Equal(t, int64(0), dq.Depth())

for i := 0; i < 11; i++ {
err := dq.Put(msg)
Nil(t, err)
Equal(t, int64(1), dq.Depth())

Equal(t, msg, <-dq.ReadChan())
Equal(t, int64(0), dq.Depth())
}

Equal(t, int64(1), dq.(*diskQueue).writeFileNum)
Equal(t, int64(ml+4), dq.(*diskQueue).writePos)

filepath.Walk(tmpDir, func(path string, info fs.FileInfo, err error) error {
if strings.HasSuffix(path, ".bad") {
t.FailNow()
}

return err
})
}

0 comments on commit 5a894f7

Please sign in to comment.