Skip to content

Commit

Permalink
Verify video parts on download threads (#1110)
Browse files Browse the repository at this point in the history
* Verify file length on download thread

* Jitter download thread sleeps

* Simplify VideoDownloader video part verification
  • Loading branch information
ScrubN authored Jun 24, 2024
1 parent 07f97c0 commit 085070b
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 36 deletions.
5 changes: 4 additions & 1 deletion TwitchDownloaderCore/Tools/DownloadTools.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ public static class DownloadTools
/// <param name="throttleKib">The maximum download speed in kibibytes per second, or -1 for no maximum.</param>
/// <param name="logger">Logger.</param>
/// <param name="cancellationTokenSource">A <see cref="CancellationTokenSource"/> containing a <see cref="CancellationToken"/> to cancel the operation.</param>
/// <returns>The expected length of the downloaded file, or -1 if the content length header is not present.</returns>
/// <remarks>The <paramref name="cancellationTokenSource"/> may be canceled by this method.</remarks>
public static async Task DownloadFileAsync(HttpClient httpClient, Uri url, string destinationFile, int throttleKib, ITaskLogger logger, CancellationTokenSource cancellationTokenSource = null)
public static async Task<long> DownloadFileAsync(HttpClient httpClient, Uri url, string destinationFile, int throttleKib, ITaskLogger logger, CancellationTokenSource cancellationTokenSource = null)
{
var request = new HttpRequestMessage(HttpMethod.Get, url);

Expand Down Expand Up @@ -74,6 +75,8 @@ public static async Task DownloadFileAsync(HttpClient httpClient, Uri url, strin
// Reset the cts timer so it can be reused for the next download on this thread.
// Is there a friendlier way to do this? Yes. Does it involve creating and destroying 4,000 CancellationTokenSources that are almost never cancelled? Also Yes.
cancellationTokenSource?.CancelAfter(TimeSpan.FromMilliseconds(uint.MaxValue - 1));

return response.Content.Headers.ContentLength ?? -1;
}


Expand Down
46 changes: 39 additions & 7 deletions TwitchDownloaderCore/Tools/VideoDownloadThread.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,7 @@ private void ExecuteDownloadThread()
throw;
}

const int A_PRIME_NUMBER = 71;
Thread.Sleep(A_PRIME_NUMBER);
Thread.Sleep(Random.Shared.Next(50, 150));
}
}

Expand All @@ -97,21 +96,48 @@ private async Task DownloadVideoPartAsync(string videoPartName, CancellationToke
var tryUnmute = VodAge < TimeSpan.FromHours(24);
var errorCount = 0;
var timeoutCount = 0;
var lengthFailureCount = 0;
while (true)
{
cancellationTokenSource.Token.ThrowIfCancellationRequested();

try
{
var partFile = Path.Combine(_cacheFolder, DownloadTools.RemoveQueryString(videoPartName));
long expectedLength;
if (tryUnmute && videoPartName.Contains("-muted"))
{
var unmutedPartName = videoPartName.Replace("-muted", "");
await DownloadTools.DownloadFileAsync(_client, new Uri(_baseUrl, unmutedPartName), partFile, _throttleKib, _logger, cancellationTokenSource);
expectedLength = await DownloadTools.DownloadFileAsync(_client, new Uri(_baseUrl, unmutedPartName), partFile, _throttleKib, _logger, cancellationTokenSource);
}
else
{
await DownloadTools.DownloadFileAsync(_client, new Uri(_baseUrl, videoPartName), partFile, _throttleKib, _logger, cancellationTokenSource);
expectedLength = await DownloadTools.DownloadFileAsync(_client, new Uri(_baseUrl, videoPartName), partFile, _throttleKib, _logger, cancellationTokenSource);
}

if (expectedLength is not -1)
{
// I would love to compare hashes here but unfortunately Twitch doesn't give us a ContentMD5 header
var actualLength = new FileInfo(partFile).Length;
if (actualLength != expectedLength)
{
const int MAX_RETRIES = 1;

_logger.LogVerbose($"{partFile} failed to verify: expected {expectedLength:N0}B, got {actualLength:N0}B.");
if (++lengthFailureCount > MAX_RETRIES)
{
throw new Exception($"Failed to download {partFile}: expected {expectedLength:N0}B, got {actualLength:N0}B.");
}

await Delay(1_000, cancellationTokenSource.Token);
continue;
}

const int TS_PACKET_LENGTH = 188; // MPEG TS packets are made of a header and a body: [ 4B ][ 184B ] - https://tsduck.io/download/docs/mpegts-introduction.pdf
if (expectedLength % TS_PACKET_LENGTH != 0)
{
_logger.LogVerbose($"{partFile} contains malformed packets and may cause encoding issues.");
}
}

return;
Expand All @@ -121,7 +147,7 @@ private async Task DownloadVideoPartAsync(string videoPartName, CancellationToke
_logger.LogVerbose($"Received {ex.StatusCode}: {ex.StatusCode} when trying to unmute {videoPartName}. Disabling {nameof(tryUnmute)}.");
tryUnmute = false;

await Task.Delay(100, cancellationTokenSource.Token);
await Delay(100, cancellationTokenSource.Token);
}
catch (HttpRequestException ex)
{
Expand All @@ -133,7 +159,7 @@ private async Task DownloadVideoPartAsync(string videoPartName, CancellationToke
throw new HttpRequestException($"Video part {videoPartName} failed after {MAX_RETRIES} retries");
}

await Task.Delay(1_000 * errorCount, cancellationTokenSource.Token);
await Delay(1_000 * errorCount, cancellationTokenSource.Token);
}
catch (TaskCanceledException ex) when (ex.Message.Contains("HttpClient.Timeout"))
{
Expand All @@ -145,9 +171,15 @@ private async Task DownloadVideoPartAsync(string videoPartName, CancellationToke
throw new HttpRequestException($"Video part {videoPartName} timed out {MAX_RETRIES} times");
}

await Task.Delay(5_000 * timeoutCount, cancellationTokenSource.Token);
await Delay(5_000 * timeoutCount, cancellationTokenSource.Token);
}
}
}

private static Task Delay(int millis, CancellationToken cancellationToken)
{
var jitteredMillis = millis + Random.Shared.Next(-200, 200);
return Task.Delay(Math.Max(millis / 2, jitteredMillis), cancellationToken);
}
}
}
32 changes: 4 additions & 28 deletions TwitchDownloaderCore/VideoDownloader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,8 @@ private async Task VerifyDownloadedParts(ICollection<M3U8.Stream> playlist, Rang
foreach (var part in playlist.Take(videoListCrop))
{
var filePath = Path.Combine(downloadFolder, DownloadTools.RemoveQueryString(part.Path));
if (!VerifyVideoPart(filePath))
var fi = new FileInfo(filePath);
if (!fi.Exists || fi.Length == 0)
{
failedParts.Add(part);
}
Expand All @@ -312,43 +313,18 @@ private async Task VerifyDownloadedParts(ICollection<M3U8.Stream> playlist, Rang

if (failedParts.Count != 0)
{
if (playlist.Count == 1)
{
// The video is only 1 part, it probably won't be a complete file.
return;
}

if (partCount > 20 && failedParts.Count >= partCount * 0.95)
{
// 19/20 parts failed to verify. Either the VOD is heavily corrupted or something went horribly wrong.
// 19/20 parts are missing or empty, something went horribly wrong.
// TODO: Somehow let the user bypass this. Maybe with callbacks?
throw new Exception($"Too many parts are corrupted or missing ({failedParts.Count}/{partCount}), aborting.");
throw new Exception($"Too many parts are missing ({failedParts.Count}/{partCount}), aborting.");
}

_progress.LogInfo($"The following parts will be redownloaded: {string.Join(", ", failedParts)}");
await DownloadVideoPartsAsync(failedParts, videoListCrop, baseUrl, downloadFolder, vodAirDate, cancellationToken);
}
}

private static bool VerifyVideoPart(string filePath)
{
const int TS_PACKET_LENGTH = 188; // MPEG TS packets are made of a header and a body: [ 4B ][ 184B ] - https://tsduck.io/download/docs/mpegts-introduction.pdf

if (!File.Exists(filePath))
{
return false;
}

using var fs = File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.Read);
var fileLength = fs.Length;
if (fileLength == 0 || fileLength % TS_PACKET_LENGTH != 0)
{
return false;
}

return true;
}

private int RunFfmpegVideoCopy(string tempFolder, FileInfo outputFile, string concatListPath, string metadataPath, decimal startOffset, decimal endOffset, TimeSpan videoLength)
{
using var process = new Process
Expand Down

0 comments on commit 085070b

Please sign in to comment.