Skip to content

Commit

Permalink
feat: implement audio watchdog for RTSP source monitoring
Browse files Browse the repository at this point in the history
- Added audioWatchdog struct to track the last time data was received from the RTSP source.
- Introduced a goroutine that checks for data reception every 5 seconds and triggers a restart if no data is received for 60 seconds.
- Updated processAudio method to integrate the watchdog, ensuring it updates the last data time upon receiving audio data.
- Enhanced error handling to wait for the watchdog to finish before returning errors related to data reception.
  • Loading branch information
tphakala committed Jan 5, 2025
1 parent 175dd9c commit 674d620
Showing 1 changed file with 46 additions and 0 deletions.
46 changes: 46 additions & 0 deletions internal/myaudio/ffmpeg_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ type FFmpegProcess struct {
restartTracker *FFmpegRestartTracker
}

// audioWatchdog is a struct that keeps track of the last time data was received from the RTSP source
type audioWatchdog struct {
lastDataTime time.Time
mu sync.Mutex
}

// FFmpegRestartTracker keeps track of restart information for each RTSP source
type FFmpegRestartTracker struct {
restartCount int
Expand Down Expand Up @@ -195,24 +201,50 @@ func (p *FFmpegProcess) Cleanup(url string) {
func (p *FFmpegProcess) processAudio(ctx context.Context, url string, audioLevelChan chan AudioLevelData) error {
// Create a buffer to store audio data
buf := make([]byte, 32768)
watchdog := &audioWatchdog{lastDataTime: time.Now()}

// Start watchdog goroutine
watchdogDone := make(chan struct{})
go func() {
defer close(watchdogDone)
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if watchdog.timeSinceLastData() > 60*time.Second {
log.Printf("No data received from RTSP source %s for 60 seconds, triggering restart", url)
return
}
}
}
}()

// Continuously process audio data
for {
select {
// Check if the context has been cancelled
case <-ctx.Done():
<-watchdogDone // Wait for watchdog to finish
// If so, return the context error
return ctx.Err()
case <-watchdogDone:
return fmt.Errorf("watchdog detected no data, restarting RTSP source %s", url)
default:
// Read audio data from FFmpeg's stdout
n, err := p.stdout.Read(buf)
if err != nil {
<-watchdogDone // Wait for watchdog to finish
// Error occurred while reading from ffmpeg, this covers EOF and other errors
return fmt.Errorf("error reading from ffmpeg: %w", err)
}

// Ensure we don't process more data than we've read
if n > 0 {
watchdog.update() // Update the watchdog timestamp
// Write the audio data to the analysis buffer
WriteToAnalysisBuffer(url, buf[:n])
// Write the audio data to the capture buffer
Expand Down Expand Up @@ -451,3 +483,17 @@ func CaptureAudioRTSP(url string, transport string, wg *sync.WaitGroup, quitChan
log.Printf("FFmpeg lifecycle manager for RTSP source %s exited with error: %v", url, err)
}
}

// audioWatchdog is a struct that keeps track of the last time data was received from the RTSP source
func (w *audioWatchdog) update() {
w.mu.Lock()
w.lastDataTime = time.Now()
w.mu.Unlock()
}

// timeSinceLastData returns the time since the last data was received from the RTSP source
func (w *audioWatchdog) timeSinceLastData() time.Duration {
w.mu.Lock()
defer w.mu.Unlock()
return time.Since(w.lastDataTime)
}

0 comments on commit 674d620

Please sign in to comment.