Skip to content

Commit

Permalink
Add some verbose logs to video download threads (#1033)
Browse files Browse the repository at this point in the history
* Create DownloadThread class

* Add some verbose logging!

* Cleanup

* Move DownloadVideoPartAsync into DownloadThread

* Cleanup

* Extract DownloadThread to VideoDownloadThread and several methods to DownloadTools
  • Loading branch information
ScrubN authored Apr 8, 2024
1 parent ab029ec commit f9859bb
Show file tree
Hide file tree
Showing 5 changed files with 268 additions and 206 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ internal abstract class TwitchDownloaderArgs
[Option("banner", Default = true, HelpText = "Displays a banner containing version and copyright information.")]
public bool? ShowBanner { get; set; }

[Option("log-level", Default = Models.LogLevel.Status | LogLevel.Info | LogLevel.Warning | LogLevel.Error, HelpText = "Sets the log level flags. Applicable values are: None, Status, Verbose, Info, Warning, Error, Ffmpeg.")]
[Option("log-level", Default = LogLevel.Status | LogLevel.Info | LogLevel.Warning | LogLevel.Error, HelpText = "Sets the log level flags. Applicable values are: None, Status, Verbose, Info, Warning, Error, Ffmpeg.")]
public LogLevel LogLevel { get; set; }
}
}
1 change: 0 additions & 1 deletion TwitchDownloaderCore/Interfaces/ITaskLogger.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ namespace TwitchDownloaderCore.Interfaces
{
public interface ITaskLogger
{
// TODO: Add DefaultInterpolatedStringHandler overloads once log levels are implemented for zero-alloc logging
void LogVerbose(string logMessage);
void LogVerbose(DefaultInterpolatedStringHandler logMessage);
void LogInfo(string logMessage);
Expand Down
94 changes: 94 additions & 0 deletions TwitchDownloaderCore/Tools/DownloadTools.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
using System;
using System.IO;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using TwitchDownloaderCore.Interfaces;

namespace TwitchDownloaderCore.Tools
{
public static class DownloadTools
{
/// <summary>
/// Downloads the requested <paramref name="url"/> to the <paramref name="destinationFile"/> without storing it in memory.
/// </summary>
/// <param name="httpClient">The <see cref="HttpClient"/> to perform the download operation.</param>
/// <param name="url">The url of the file to download.</param>
/// <param name="destinationFile">The path to the file where download will be saved.</param>
/// <param name="throttleKib">The maximum download speed in kibibytes per second, or -1 for no maximum.</param>
/// <param name="logger">Logger.</param>
/// <param name="cancellationTokenSource">A <see cref="CancellationTokenSource"/> containing a <see cref="CancellationToken"/> to cancel the operation.</param>
/// <remarks>The <paramref name="cancellationTokenSource"/> may be canceled by this method.</remarks>
public static async Task DownloadFileAsync(HttpClient httpClient, Uri url, string destinationFile, int throttleKib, ITaskLogger logger, CancellationTokenSource cancellationTokenSource = null)
{
var request = new HttpRequestMessage(HttpMethod.Get, url);

var cancellationToken = cancellationTokenSource?.Token ?? CancellationToken.None;

using var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false);
response.EnsureSuccessStatusCode();

// Why are we setting a CTS CancelAfter timer? See lay295#265
const int SIXTY_SECONDS = 60;
if (throttleKib == -1 || !response.Content.Headers.ContentLength.HasValue)
{
cancellationTokenSource?.CancelAfter(TimeSpan.FromSeconds(SIXTY_SECONDS));
}
else
{
const double ONE_KIBIBYTE = 1024d;
cancellationTokenSource?.CancelAfter(TimeSpan.FromSeconds(Math.Max(
SIXTY_SECONDS,
response.Content.Headers.ContentLength!.Value / ONE_KIBIBYTE / throttleKib * 8 // Allow up to 8x the shortest download time given the thread bandwidth
)));
}

switch (throttleKib)
{
case -1:
{
await using var fs = new FileStream(destinationFile, FileMode.Create, FileAccess.Write, FileShare.Read);
await response.Content.CopyToAsync(fs, cancellationToken).ConfigureAwait(false);
break;
}
default:
{
try
{
await using var contentStream = await response.Content.ReadAsStreamAsync(cancellationToken);
await using var throttledStream = new ThrottledStream(contentStream, throttleKib);
await using var fs = new FileStream(destinationFile, FileMode.Create, FileAccess.Write, FileShare.Read);
await throttledStream.CopyToAsync(fs, cancellationToken).ConfigureAwait(false);
}
catch (IOException ex) when (ex.Message.Contains("EOF"))
{
// If we get an exception for EOF, it may be related to the throttler. Try again without it.
logger.LogVerbose($"Unexpected EOF, retrying without bandwidth throttle. Message: {ex.Message}.");
await Task.Delay(2_000, cancellationToken);
goto case -1;
}
break;
}
}

// Reset the cts timer so it can be reused for the next download on this thread.
// Is there a friendlier way to do this? Yes. Does it involve creating and destroying 4,000 CancellationTokenSources that are almost never cancelled? Also Yes.
cancellationTokenSource?.CancelAfter(TimeSpan.FromMilliseconds(uint.MaxValue - 1));
}


/// <summary>
/// Some old twitch VODs have files with a query string at the end such as 1.ts?offset=blah which isn't a valid filename
/// </summary>
public static string RemoveQueryString(string inputString)
{
var queryIndex = inputString.IndexOf('?');
if (queryIndex == -1)
{
return inputString;
}

return inputString[..queryIndex];
}
}
}
153 changes: 153 additions & 0 deletions TwitchDownloaderCore/Tools/VideoDownloadThread.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
using System;
using System.Collections.Concurrent;
using System.IO;
using System.Net;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using TwitchDownloaderCore.Interfaces;

namespace TwitchDownloaderCore.Tools
{
internal sealed record VideoDownloadThread
{
private readonly ConcurrentQueue<string> _videoPartsQueue;
private readonly HttpClient _client;
private readonly Uri _baseUrl;
private readonly string _cacheFolder;
private readonly DateTimeOffset _vodAirDate;
private TimeSpan VodAge => DateTimeOffset.UtcNow - _vodAirDate;
private readonly int _throttleKib;
private readonly ITaskLogger _logger;
private readonly CancellationToken _cancellationToken;
public Task ThreadTask { get; private set; }

public VideoDownloadThread(ConcurrentQueue<string> videoPartsQueue, HttpClient httpClient, Uri baseUrl, string cacheFolder, DateTimeOffset vodAirDate, int throttleKib, ITaskLogger logger, CancellationToken cancellationToken)
{
_videoPartsQueue = videoPartsQueue;
_client = httpClient;
_baseUrl = baseUrl;
_cacheFolder = cacheFolder;
_vodAirDate = vodAirDate;
_throttleKib = throttleKib;
_logger = logger;
_cancellationToken = cancellationToken;
StartDownload();
}

public void StartDownload()
{
if (ThreadTask is { Status: TaskStatus.Created or TaskStatus.WaitingForActivation or TaskStatus.WaitingToRun or TaskStatus.Running })
{
throw new InvalidOperationException($"Tried to start a thread that was already running or waiting to run ({ThreadTask.Status}).");
}

ThreadTask = Task.Factory.StartNew(
ExecuteDownloadThread,
_cancellationToken,
TaskCreationOptions.LongRunning,
TaskScheduler.Current);
}

private void ExecuteDownloadThread()
{
using var cts = new CancellationTokenSource();
_cancellationToken.Register(PropagateCancel, cts);

while (!_videoPartsQueue.IsEmpty)
{
_cancellationToken.ThrowIfCancellationRequested();

string videoPart = null;
try
{
if (_videoPartsQueue.TryDequeue(out videoPart))
{
DownloadVideoPartAsync(videoPart, cts).GetAwaiter().GetResult();
}
}
catch
{
if (videoPart != null && !_cancellationToken.IsCancellationRequested)
{
// Requeue the video part now instead of deferring to the verifier since we already know it's bad
_videoPartsQueue.Enqueue(videoPart);
}

throw;
}

const int A_PRIME_NUMBER = 71;
Thread.Sleep(A_PRIME_NUMBER);
}
}

private static void PropagateCancel(object tokenSourceToCancel)
{
try
{
(tokenSourceToCancel as CancellationTokenSource)?.Cancel();
}
catch (ObjectDisposedException) { }
}

/// <remarks>The <paramref name="cancellationTokenSource"/> may be canceled by this method.</remarks>
private async Task DownloadVideoPartAsync(string videoPartName, CancellationTokenSource cancellationTokenSource)
{
var tryUnmute = VodAge < TimeSpan.FromHours(24);
var errorCount = 0;
var timeoutCount = 0;
while (true)
{
cancellationTokenSource.Token.ThrowIfCancellationRequested();

try
{
var partFile = Path.Combine(_cacheFolder, DownloadTools.RemoveQueryString(videoPartName));
if (tryUnmute && videoPartName.Contains("-muted"))
{
var unmutedPartName = videoPartName.Replace("-muted", "");
await DownloadTools.DownloadFileAsync(_client, new Uri(_baseUrl, unmutedPartName), partFile, _throttleKib, _logger, cancellationTokenSource);
}
else
{
await DownloadTools.DownloadFileAsync(_client, new Uri(_baseUrl, videoPartName), partFile, _throttleKib, _logger, cancellationTokenSource);
}

return;
}
catch (HttpRequestException ex) when (tryUnmute && ex.StatusCode is HttpStatusCode.Forbidden)
{
_logger.LogVerbose($"Received {ex.StatusCode}: {ex.StatusCode} when trying to unmute {videoPartName}. Disabling {nameof(tryUnmute)}.");
tryUnmute = false;

await Task.Delay(100, cancellationTokenSource.Token);
}
catch (HttpRequestException ex)
{
const int MAX_RETRIES = 10;

_logger.LogVerbose($"Received {(int)(ex.StatusCode ?? 0)}: {ex.StatusCode} for {videoPartName}. {MAX_RETRIES - (errorCount + 1)} retries left.");
if (++errorCount > MAX_RETRIES)
{
throw new HttpRequestException($"Video part {videoPartName} failed after {MAX_RETRIES} retries");
}

await Task.Delay(1_000 * errorCount, cancellationTokenSource.Token);
}
catch (TaskCanceledException ex) when (ex.Message.Contains("HttpClient.Timeout"))
{
const int MAX_RETRIES = 3;

_logger.LogVerbose($"{videoPartName} timed out. {MAX_RETRIES - (timeoutCount + 1)} retries left.");
if (++timeoutCount > MAX_RETRIES)
{
throw new HttpRequestException($"Video part {videoPartName} timed out {MAX_RETRIES} times");
}

await Task.Delay(5_000 * timeoutCount, cancellationTokenSource.Token);
}
}
}
}
}
Loading

0 comments on commit f9859bb

Please sign in to comment.