From 674d62016e6ae3a20fbf414a288b676ad9bd5131 Mon Sep 17 00:00:00 2001 From: "Tomi P. Hakala" Date: Sun, 5 Jan 2025 13:05:04 +0200 Subject: [PATCH] feat: implement audio watchdog for RTSP source monitoring - Added audioWatchdog struct to track the last time data was received from the RTSP source. - Introduced a goroutine that checks for data reception every 5 seconds and triggers a restart if no data is received for 60 seconds. - Updated processAudio method to integrate the watchdog, ensuring it updates the last data time upon receiving audio data. - Enhanced error handling to wait for the watchdog to finish before returning errors related to data reception. --- internal/myaudio/ffmpeg_input.go | 46 ++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/internal/myaudio/ffmpeg_input.go b/internal/myaudio/ffmpeg_input.go index 9685ea2..fdee1c9 100644 --- a/internal/myaudio/ffmpeg_input.go +++ b/internal/myaudio/ffmpeg_input.go @@ -33,6 +33,12 @@ type FFmpegProcess struct { restartTracker *FFmpegRestartTracker } +// audioWatchdog is a struct that keeps track of the last time data was received from the RTSP source +type audioWatchdog struct { + lastDataTime time.Time + mu sync.Mutex +} + // FFmpegRestartTracker keeps track of restart information for each RTSP source type FFmpegRestartTracker struct { restartCount int @@ -195,24 +201,50 @@ func (p *FFmpegProcess) Cleanup(url string) { func (p *FFmpegProcess) processAudio(ctx context.Context, url string, audioLevelChan chan AudioLevelData) error { // Create a buffer to store audio data buf := make([]byte, 32768) + watchdog := &audioWatchdog{lastDataTime: time.Now()} + + // Start watchdog goroutine + watchdogDone := make(chan struct{}) + go func() { + defer close(watchdogDone) + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if watchdog.timeSinceLastData() > 60*time.Second { + log.Printf("No data received from RTSP source %s for 60 seconds, triggering restart", url) + return + } + } + } + }() // Continuously process audio data for { select { // Check if the context has been cancelled case <-ctx.Done(): + <-watchdogDone // Wait for watchdog to finish // If so, return the context error return ctx.Err() + case <-watchdogDone: + return fmt.Errorf("watchdog detected no data, restarting RTSP source %s", url) default: // Read audio data from FFmpeg's stdout n, err := p.stdout.Read(buf) if err != nil { + <-watchdogDone // Wait for watchdog to finish // Error occurred while reading from ffmpeg, this covers EOF and other errors return fmt.Errorf("error reading from ffmpeg: %w", err) } // Ensure we don't process more data than we've read if n > 0 { + watchdog.update() // Update the watchdog timestamp // Write the audio data to the analysis buffer WriteToAnalysisBuffer(url, buf[:n]) // Write the audio data to the capture buffer @@ -451,3 +483,17 @@ func CaptureAudioRTSP(url string, transport string, wg *sync.WaitGroup, quitChan log.Printf("FFmpeg lifecycle manager for RTSP source %s exited with error: %v", url, err) } } + +// audioWatchdog is a struct that keeps track of the last time data was received from the RTSP source +func (w *audioWatchdog) update() { + w.mu.Lock() + w.lastDataTime = time.Now() + w.mu.Unlock() +} + +// timeSinceLastData returns the time since the last data was received from the RTSP source +func (w *audioWatchdog) timeSinceLastData() time.Duration { + w.mu.Lock() + defer w.mu.Unlock() + return time.Since(w.lastDataTime) +}