-
Notifications
You must be signed in to change notification settings - Fork 175
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
44001c6
commit 945991e
Showing
1 changed file
with
108 additions
and
55 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -65,17 +65,17 @@ public function store(Collection $items): void | |
]; | ||
|
||
$this | ||
->aggregateAttributes($entries->filter->isCount(), $periods, 'count') | ||
->aggregateCounts($entries->filter->isCount(), $periods) | ||
->chunk($this->config->get('pulse.storage.database.chunk')) | ||
->each(fn ($chunk) => $this->upsertCount($chunk->all())); | ||
|
||
$this | ||
->aggregateAttributes($entries->filter->isMax(), $periods, 'max') | ||
->aggregateMaximums($entries->filter->isMax(), $periods) | ||
->chunk($this->config->get('pulse.storage.database.chunk')) | ||
->each(fn ($chunk) => $this->upsertMax($chunk->all())); | ||
|
||
$this | ||
->aggregateAttributes($entries->filter->isAvg(), $periods, 'avg') | ||
->aggregateAverages($entries->filter->isAvg(), $periods) | ||
->chunk($this->config->get('pulse.storage.database.chunk')) | ||
->each(fn ($chunk) => $this->upsertAvg($chunk->all())); | ||
|
||
|
@@ -213,80 +213,133 @@ protected function upsert(array $values, string $onDuplicateKeyClause): bool | |
} | ||
|
||
/** | ||
* Get the aggregate attributes for the collection. | ||
* Get the count aggregates | ||
* | ||
* @param \Illuminate\Support\Collection<int, \Laravel\Pulse\Entry> $entries | ||
* @param list<int> $periods | ||
* @return \Illuminate\Support\LazyCollection<int, AggregateRow> | ||
* @return \Illuminate\Support\Collection<int, AggregateRow> | ||
*/ | ||
protected function aggregateAttributes(Collection $entries, array $periods, string $aggregateSuffix): Collection | ||
protected function aggregateCounts(Collection $entries, array $periods): Collection | ||
{ | ||
$aggregates = LazyCollection::make(function () use ($entries, $periods, $aggregateSuffix) { | ||
foreach ($entries as $entry) { | ||
foreach ($periods as $period) { | ||
// Exclude entries that would be trimmed. | ||
if ($entry->timestamp < CarbonImmutable::now()->subMinutes($period)->getTimestamp()) { | ||
continue; | ||
} | ||
$aggregates = []; | ||
|
||
foreach ($entries as $entry) { | ||
foreach ($periods as $period) { | ||
// TODO: add back the comment | ||
if ($entry->timestamp < CarbonImmutable::now()->subMinutes($period)->getTimestamp()) { | ||
continue; | ||
} | ||
|
||
yield [ | ||
'bucket' => (int) (floor($entry->timestamp / $period) * $period), | ||
$bucket = (int) (floor($entry->timestamp / $period) * $period); | ||
|
||
$key = $entry->type.':'.$period.':'.$bucket.':'.$entry->key; | ||
|
||
if (! isset($aggregates[$key])) { | ||
$aggregates[$key] = [ | ||
'bucket' => $bucket, | ||
'period' => $period, | ||
'type' => $entry->type, | ||
'aggregate' => $aggregateSuffix, | ||
'aggregate' => 'count', | ||
'key' => $entry->key, | ||
'value' => $aggregateSuffix === 'count' | ||
? 1 | ||
: $entry->value, | ||
...($aggregateSuffix === 'avg') | ||
? ['count' => 1] | ||
: [], | ||
'value' => 1, | ||
]; | ||
} else { | ||
$aggregates[$key]['value']++; | ||
} | ||
} | ||
}); | ||
|
||
$collapsed = match ($aggregateSuffix) { | ||
'count' => $this->collapseCounts(collect($aggregates)), | ||
'max' => $this->collapseMaxes(collect($aggregates)), | ||
'avg' => $this->collapseAverages(collect($aggregates)), | ||
}; | ||
} | ||
|
||
return $collapsed; | ||
return collect(array_values($aggregates)); | ||
Check failure on line 252 in src/Storage/DatabaseStorage.php GitHub Actions / Static Analysis
|
||
} | ||
|
||
protected function collapseCounts(Collection $entries): Collection | ||
/** | ||
* Get the maximum aggregates | ||
* | ||
* @param \Illuminate\Support\Collection<int, \Laravel\Pulse\Entry> $entries | ||
* @param list<int> $periods | ||
* @return \Illuminate\Support\Collection<int, AggregateRow> | ||
*/ | ||
protected function aggregateMaximums(Collection $entries, array $periods): Collection | ||
{ | ||
return $entries | ||
->groupBy(fn ($value) => implode('-', Arr::only($value, ['bucket', 'period', 'type', 'aggregate', 'key']))) | ||
->map(fn ($values) => [ | ||
...$values[0], | ||
'value' => $values->count(), | ||
]) | ||
->values(); | ||
$aggregates = []; | ||
|
||
foreach ($entries as $entry) { | ||
foreach ($periods as $period) { | ||
if ($entry->timestamp < CarbonImmutable::now()->subMinutes($period)->getTimestamp()) { | ||
continue; | ||
} | ||
|
||
$bucket = (int) (floor($entry->timestamp / $period) * $period); | ||
|
||
$key = $entry->type.':'.$period.':'.$bucket.':'.$entry->key; | ||
|
||
if (! isset($aggregates[$key])) { | ||
$aggregates[$key] = [ | ||
'bucket' => $bucket, | ||
'period' => $period, | ||
'type' => $entry->type, | ||
'aggregate' => 'max', | ||
'key' => $entry->key, | ||
'value' => $entry->value, | ||
]; | ||
} else { | ||
$aggregates[$key]['value'] = max($aggregates[$key]['value'], $entry->value); | ||
} | ||
} | ||
} | ||
|
||
return collect(array_values($aggregates)); | ||
Check failure on line 291 in src/Storage/DatabaseStorage.php GitHub Actions / Static Analysis
|
||
} | ||
|
||
protected function collapseMaxes(Collection $entries): Collection | ||
/** | ||
* Get the average aggregates | ||
* | ||
* @param \Illuminate\Support\Collection<int, \Laravel\Pulse\Entry> $entries | ||
* @param list<int> $periods | ||
* @return \Illuminate\Support\Collection<int, AggregateRow> | ||
*/ | ||
protected function aggregateAverages(Collection $entries): Collection | ||
Check failure on line 301 in src/Storage/DatabaseStorage.php GitHub Actions / Static Analysis
|
||
{ | ||
return $entries | ||
->groupBy(fn ($value) => implode('-', Arr::only($value, ['bucket', 'period', 'type', 'aggregate', 'key']))) | ||
->map(fn ($values) => [ | ||
...$values[0], | ||
'value' => $values->max('value'), | ||
]) | ||
->values(); | ||
$aggregates = []; | ||
|
||
foreach ($entries as $entry) { | ||
foreach ($this->periods() as $period) { | ||
if ($entry->timestamp < CarbonImmutable::now()->subMinutes($period)->getTimestamp()) { | ||
continue; | ||
} | ||
|
||
$bucket = (int) (floor($entry->timestamp / $period) * $period); | ||
|
||
$key = $entry->type.':'.$period.':'.$bucket.':'.$entry->key; | ||
|
||
if (! isset($aggregates[$key])) { | ||
$aggregates[$key] = [ | ||
'bucket' => $bucket, | ||
'period' => $period, | ||
'type' => $entry->type, | ||
'aggregate' => 'avg', | ||
'key' => $entry->key, | ||
'value' => $entry->value, | ||
'count' => 1, | ||
]; | ||
} else { | ||
$aggregates[$key]['value'] = ($aggregates[$key]['value'] * $aggregates[$key]['count'] + $entry->value) / ($aggregates[$key]['count'] + 1); | ||
$aggregates[$key]['count']++; | ||
} | ||
} | ||
} | ||
|
||
return collect(array_values($aggregates)); | ||
Check failure on line 332 in src/Storage/DatabaseStorage.php GitHub Actions / Static Analysis
|
||
} | ||
|
||
protected function collapseAverages(Collection $entries): Collection | ||
protected function periods() | ||
{ | ||
return $entries | ||
->groupBy(fn ($value) => implode('-', Arr::only($value, ['bucket', 'period', 'type', 'aggregate', 'key']))) | ||
->map(fn ($values) => [ | ||
...$values[0], | ||
'value' => $values->avg('value'), | ||
'count' => $values->count(), | ||
]) | ||
->values(); | ||
return [ | ||
(int) (CarbonInterval::hour()->totalSeconds / 60), | ||
(int) (CarbonInterval::hours(6)->totalSeconds / 60), | ||
(int) (CarbonInterval::hours(24)->totalSeconds / 60), | ||
(int) (CarbonInterval::days(7)->totalSeconds / 60), | ||
]; | ||
} | ||
|
||
/** | ||
|