Skip to content

Commit

Permalink
#83: fixes in ZlibStream, XEP25Socket, KerbProcessor
Browse files Browse the repository at this point in the history
  • Loading branch information
ForNeVeR committed Mar 16, 2017
1 parent c78646e commit 4d473b8
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 219 deletions.
8 changes: 8 additions & 0 deletions src/JabberNet/JabberNet.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,19 @@
<Copyright>Copyright (c) Cursive, Inc. 2000—2008; Jabber-Net Contributors, 2008—2017</Copyright>
</PropertyGroup>

<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
<AllowUnsafeBlocks>True</AllowUnsafeBlocks>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="ComponentAce.Compression.Libs.zlib" Version="1.0.4" />
<PackageReference Include="System.Collections" Version="4.3.0" />
<PackageReference Include="System.Collections.NonGeneric" Version="4.3.0" />
<PackageReference Include="System.IO.FileSystem.Watcher" Version="4.3.0" />
<PackageReference Include="System.Net.Requests" Version="4.3.0" />
<PackageReference Include="System.Net.Security" Version="4.3.0" />
<PackageReference Include="System.Net.WebHeaderCollection" Version="4.3.0" />
<PackageReference Include="System.Threading.Thread" Version="4.3.0" />
<PackageReference Include="System.Xml.XmlDocument" Version="4.3.0" />
<PackageReference Include="System.Xml.XPath.XmlDocument" Version="4.3.0" />
</ItemGroup>
Expand Down
154 changes: 18 additions & 136 deletions src/JabberNet/bedrock/io/ZlibStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,16 @@

using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using ComponentAce.Compression.Libs.zlib;

namespace JabberNet.bedrock.io
{
/// <summary>
/// Compression failed.
/// </summary>
public class CompressionFailedException : ApplicationException
public class CompressionFailedException : Exception
{
/// <summary>
///
Expand Down Expand Up @@ -161,16 +163,7 @@ public override long Position
set { throw new NotImplementedException("The method or operation is not implemented."); }
}

/// <summary>
/// Start an async read. Implemented locally, since Stream.BeginRead() isn't really asynch.
/// </summary>
/// <param name="buffer"></param>
/// <param name="offset"></param>
/// <param name="count"></param>
/// <param name="callback"></param>
/// <param name="state"></param>
/// <returns></returns>
public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
if (count <= 0)
throw new ArgumentException("Can't read 0 bytes", "count");
Expand All @@ -181,23 +174,10 @@ public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, Asy
if (m_in.avail_in == 0)
{
m_in.next_in_index = 0;
return m_stream.BeginRead(m_inbuf, 0, bufsize, callback, state);
return m_stream.ReadAsync(m_inbuf, 0, bufsize, cancellationToken);
}
ZlibStreamAsyncResult ar = new ZlibStreamAsyncResult(state);
callback(ar);
return ar;
}

/// <summary>
/// Complete a pending read, when the callback passed to BeginRead fires.
/// </summary>
/// <param name="asyncResult"></param>
/// <returns></returns>
public override int EndRead(IAsyncResult asyncResult)
{
if (!(asyncResult is ZlibStreamAsyncResult))
m_in.avail_in = m_stream.EndRead(asyncResult);
return Inflate();
return Task.FromResult(Inflate());
}

/// <summary>
Expand Down Expand Up @@ -260,17 +240,7 @@ public override void SetLength(long value)
throw new NotImplementedException("The method or operation is not implemented.");
}

/// <summary>
/// Begin an asynch write, compressing first. Implemented locally, since Stream.BeginWrite isn't asynch.
/// Note: may call Write() on the underlying stream more than once.
/// </summary>
/// <param name="buffer"></param>
/// <param name="offset"></param>
/// <param name="count"></param>
/// <param name="callback"></param>
/// <param name="state"></param>
/// <returns></returns>
public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
if (count <= 0)
throw new ArgumentException("Can't write 0 bytes", "count");
Expand All @@ -286,64 +256,37 @@ public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, As
{
if (err != zlibConst.Z_OK)
{
ZlibStreamAsyncResult res = new ZlibStreamAsyncResult(state, new CompressionFailedException("Compress failed: " + err));
callback(res);
return res;
throw new CompressionFailedException("Compress failed: " + err);
}
}
if (m_out.avail_in == 0)
return m_stream.BeginWrite(m_outbuf, 0, bufsize - m_out.avail_out, callback, state);
await m_stream.WriteAsync(m_outbuf, 0, bufsize - m_out.avail_out, cancellationToken);
else
return m_stream.BeginWrite(m_outbuf, 0, bufsize - m_out.avail_out, new AsyncCallback(IntermediateWrite), new ZlibState(callback, state));
}

private void IntermediateWrite(IAsyncResult asyncResult)
{
ZlibState state = (ZlibState)asyncResult.AsyncState;
try
{
m_stream.EndWrite(asyncResult);
}
catch (Exception e)
{
ZlibStreamAsyncResult res = new ZlibStreamAsyncResult(state.state, e);
state.callback(res);
return;
await m_stream.WriteAsync(m_outbuf, 0, bufsize - m_out.avail_out, cancellationToken);
await IntermediateWrite(cancellationToken);
}
}

private async Task IntermediateWrite(CancellationToken cancellationToken)
{
m_out.next_out_index = 0;
m_out.avail_out = bufsize;
int err = m_out.deflate(m_flush);
if (err != zlibConst.Z_STREAM_END)
{
if (err != zlibConst.Z_OK)
{
ZlibStreamAsyncResult res = new ZlibStreamAsyncResult(state.state, new CompressionFailedException("Compress failed: " + err));
state.callback(res);
return;
throw new CompressionFailedException("Compress failed: " + err);
}
}
if (m_out.avail_in == 0)
m_stream.BeginWrite(m_outbuf, 0, bufsize - m_out.avail_out, state.callback, state.state);
await m_stream.WriteAsync(m_outbuf, 0, bufsize - m_out.avail_out, cancellationToken);
else
m_stream.BeginWrite(m_outbuf, 0, bufsize - m_out.avail_out, new AsyncCallback(IntermediateWrite), state);
}

/// <summary>
/// Complete a pending write, when the callback given to BeginWrite is called.
/// </summary>
/// <param name="asyncResult"></param>
public override void EndWrite(IAsyncResult asyncResult)
{
if (asyncResult is ZlibStreamAsyncResult)
{
ZlibStreamAsyncResult ar = (ZlibStreamAsyncResult)asyncResult;
if (ar.Exception != null)
throw ar.Exception;
await m_stream.WriteAsync(m_outbuf, 0, bufsize - m_out.avail_out, cancellationToken);
await IntermediateWrite(cancellationToken);
}
else
m_stream.EndWrite(asyncResult);
return;
}

/// <summary>
Expand Down Expand Up @@ -373,66 +316,5 @@ public override void Write(byte[] buffer, int offset, int count)
m_stream.Write(m_outbuf, 0, bufsize - m_out.avail_out);
}
}

private class ZlibStreamAsyncResult : IAsyncResult
{
private object m_state = null;
private Exception m_exception;

public ZlibStreamAsyncResult(object state)
{
m_state = state;
}

public ZlibStreamAsyncResult(object state, Exception except)
{
m_state = state;
m_exception = except;
}

public Exception Exception
{
get { return m_exception; }
}

#region IAsyncResult Members

public object AsyncState
{
get { return m_state; }
}

public System.Threading.WaitHandle AsyncWaitHandle
{
get
{
throw new Exception("The method or operation is not implemented.");
}
}

public bool CompletedSynchronously
{
get { return true; }
}

public bool IsCompleted
{
get { return true; }
}

#endregion
}

private class ZlibState
{
public AsyncCallback callback;
public object state;

public ZlibState(AsyncCallback callback, object state)
{
this.callback = callback;
this.state = state;
}
}
}
}
Loading

0 comments on commit 4d473b8

Please sign in to comment.