Skip to content

Commit

Permalink
Implemented sized channel with size -1
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-sili committed Jan 13, 2025
1 parent b9b0714 commit 65fc5eb
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 2 deletions.
2 changes: 1 addition & 1 deletion exporter/exporterhelper/internal/base_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSe

if usePullingBasedExporterQueueBatcher.IsEnabled() && be.BatcherCfg.Enabled && !be.queueCfg.Enabled {
be.queueFactory = exporterqueue.NewBlockingMemoryQueueFactory[internal.Request]()
be.queueCfg.QueueSize = 20
be.queueCfg.QueueSize = -1
q := be.queueFactory(
context.Background(),
exporterqueue.Settings{
Expand Down
7 changes: 6 additions & 1 deletion exporter/exporterqueue/sized_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type sizedChannel[T any] struct {

// We need to store the capacity in a separate field because the capacity of the channel can be higher.
// It happens when we restore a persistent queue from a disk that is bigger than the pre-configured capacity.
// If cap is set to -1, sizedChannel
cap int64
ch chan T
}
Expand Down Expand Up @@ -45,7 +46,7 @@ func newSizedChannel[T any](capacity int64, els []T, totalSize int64) *sizedChan
// If the callback returns an error, the element is not put into the queue and the error is returned.
// The size is the size of the element MUST be positive.
func (vcq *sizedChannel[T]) push(el T, size int64, callback func() error) error {
if vcq.used.Add(size) > vcq.cap {
if vcq.cap != -1 && vcq.used.Add(size) > vcq.cap {
vcq.used.Add(-size)
return ErrQueueIsFull
}
Expand All @@ -62,6 +63,10 @@ func (vcq *sizedChannel[T]) push(el T, size int64, callback func() error) error
case vcq.ch <- el:
return nil
default:
if vcq.cap == -1 {
vcq.ch <- el
return nil
}
vcq.used.Add(-size)
return ErrQueueIsFull
}
Expand Down

0 comments on commit 65fc5eb

Please sign in to comment.