Pages

Sunday, April 24, 2011

Asynchronous HTTP Client

I recently came across the need to create an asynchronous HTTP client and I went through some examples online but I found that asynchronous socket connections can be somewhat difficult to manage. Due to the "unreliable" nature of the internet, there can be multiple exceptions and errors that may leave your HTTP client hanging in an invalid state. So instead of writing numerous exception cases and verifications at each stage, I tried to simplify the design by making the client behave like a state machine. So let's look at some of the code...

First, we'll start with the underlying data required for data required for each asynchronous request (which I called an AsyncTask):


public class AsyncTask
{
public byte[] ReceiveBuffer { get; private set; }
public int TotalBytesReceived { get; set; }
public int BytesReceived { get; set; }
public string DocSource { get; set; }
public string Host { get; private set; }
public int Port { get; private set; }
public string Path { get; private set; }
public IPAddress[] Addresses { get; private set; }
public AsyncTask(string host, int port, string path)
{
ReceiveBuffer = new byte[256];
TotalBytesReceived = 0;
BytesReceived = -1; // can't be zero!
Host = host;
Port = port;
Path = path;
Addresses = Dns.GetHostEntry(Host).AddressList;
}
}


In addition to a data structure to hold the internals associated with each task, we also need enumeration of the possible client states. I've come up with 11 states, but you can get by with 7 just fine. The reason I have the extra 4 states is just for getting a bit more information as to what the client is currently doing.


public enum EClientState
{
Available = 0,
Connecting,
Connected,
Disconnecting,
Disconnected,
Sending,
SendDone,
Receiving,
ReceiveDone,
Failed,
CleanUp
}


The core function of the client is the ChangeState method, which acts as the brain of the client: it manages the changes of states and enforces strict rules on how state changes can occur. If the client attempts to perform an invalid change state, then the method throws an exception. The idea is that if there is a failure at any stage, then the client can just change the state to Failed and the class will automatically clean up the state in order to resume "normal" operation.


private void ChangeState(EClientState state)
{
lock (_sync) // re-entrant lock
{
try
{
switch (state)
{
case EClientState.Available:
if (_clientState != EClientState.CleanUp)
{
ThrowInvalidStateException(_clientState, state);
}
else
{
_clientState = state;
//Console.WriteLine("Client " + _clientId + ": " + _clientState);
}
break;
case EClientState.Connecting:
if (_clientState != EClientState.Available)
{
ThrowInvalidStateException(_clientState, state);
}
else
{
_clientState = state;
//Console.WriteLine("Client " + _clientId + ": " + _clientState);
BeginConnect();
}
break;
case EClientState.Connected:
if (_clientState != EClientState.Connecting)
{
ThrowInvalidStateException(_clientState, state);
}
else
{
_clientState = state;
//Console.WriteLine("Client " + _clientId + ": " + _clientState);
ChangeState(EClientState.Sending);
}
break;
case EClientState.Disconnecting:
if (_clientState == EClientState.Available ||
_clientState == EClientState.CleanUp)
{
ThrowInvalidStateException(_clientState, state);
}
else
{
_clientState = state;
//Console.WriteLine("Client " + _clientId + ": " + _clientState);
BeginDisconnect();
}
break;
case EClientState.Disconnected:
if (_clientState != EClientState.Disconnecting)
{
ThrowInvalidStateException(_clientState, state);
}
else
{
_clientState = state;
//Console.WriteLine("Client " + _clientId + ": " + _clientState);
ChangeState(EClientState.CleanUp);
}
break;
case EClientState.Sending:
if (_clientState != EClientState.Connected)
{
ThrowInvalidStateException(_clientState, state);
}
else
{
_clientState = state;
//Console.WriteLine("Client " + _clientId + ": " + _clientState);
BeginSend();
}
break;
case EClientState.SendDone:
if (_clientState != EClientState.Sending)
{
ThrowInvalidStateException(_clientState, state);
}
else
{
_clientState = state;
//Console.WriteLine("Client " + _clientId + ": " + _clientState);
ChangeState(EClientState.Receiving);
}
break;
case EClientState.Receiving:
if (_clientState != EClientState.SendDone &&
_clientState != EClientState.Receiving)
{
ThrowInvalidStateException(_clientState, state);
}
else
{
_clientState = state;
//Console.WriteLine("Client " + _clientId + ": " + _clientState);
BeginReceive();
}
break;
case EClientState.ReceiveDone:
if (_clientState != EClientState.Receiving)
{
ThrowInvalidStateException(_clientState, state);
}
else
{
_clientState = state;
//Console.WriteLine("Client " + _clientId + ": " + _clientState);
ChangeState(EClientState.Disconnecting);
}
break;
case EClientState.Failed:
if (_clientState == EClientState.CleanUp)
{
ThrowInvalidStateException(_clientState, state);
}
else
{
_clientState = EClientState.Failed;
//Console.WriteLine("Client " + _clientId + ": " + _clientState);
ChangeState(EClientState.CleanUp);
}
break;
case EClientState.CleanUp:
if (_socket.Connected)
{
ChangeState(EClientState.Disconnecting);
}
else
{
_clientState = state;
//Console.WriteLine("Client " + _clientId + ": " + _clientState);
CleanUp();
}
break;
default:
ThrowInvalidStateException(_clientState, state);
break;
}
}
catch (InvalidOperationException e)
{
throw e;
}
catch (Exception e)
{
Console.WriteLine("ChangeState Exception:\n\t[Source]: " + e.Source + "\n\t[Message]: " + e.Message);
ChangeState(EClientState.Failed);
}
}
}


Finally, here is the client itself:


public class AsyncHttpClient:IHttpClient
{
private static readonly int KILOBYTES = 1024;
private static readonly int _maxPageSize = 128 * KILOBYTES; // limit the page size to 128 KB

private Socket _socket;
private readonly int _clientId;
private EClientState _clientState;

private AsyncTask _asyncTask;
private readonly object _sync;

public AsyncHttpClient(int clientId)
{
_sync = new object();

// Initialize the socket
_socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);

// Set the socket to be non-blocking
_socket.Blocking = false;

// Set the send and receive timeouts to 10 seconds
_socket.SetSocketOption(SocketOptionLevel.Socket,
SocketOptionName.SendTimeout,
10000);

_socket.SetSocketOption(SocketOptionLevel.Socket,
SocketOptionName.ReceiveTimeout,
10000);

// Initialize the fields
_asyncTask = new AsyncTask(string.Empty, 80, string.Empty);
_clientId = clientId;
_clientState = EClientState.Available;
}

#region IHttpClient Members
public event OnGetCompleteCallback OnGetCompleted;

public void Get(string host)
{
Get(host, 80);
}

public void Get(string host, int port)
{
lock (_sync)
{
if (_clientState != EClientState.Available)
{
// Do not invoke a client that's allready working!
throw new InvalidOperationException("The client is not available! Current state: " + _clientState.ToString());
}
else
{
try
{
// Initialize the socket
_socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);

// Set the socket to be non-blocking
_socket.Blocking = false;

// Set the send and receive timeouts to 10 seconds
_socket.SetSocketOption(SocketOptionLevel.Socket,
SocketOptionName.SendTimeout,
10000);

_socket.SetSocketOption(SocketOptionLevel.Socket,
SocketOptionName.ReceiveTimeout,
10000);

_asyncTask = new AsyncTask(host, port, string.Empty);
ChangeState(EClientState.Connecting);
}
catch (Exception e)
{
Console.WriteLine("Exception: " + e.Message);
ChangeState(EClientState.Failed);
}
}
}
}
#endregion

private void ChangeState(EClientState state)
{
lock (_sync) // re-entrant lock
{
try
{
switch (state)
{
case EClientState.Available:
if (_clientState != EClientState.CleanUp)
{
ThrowInvalidStateException(_clientState, state);
}
else
{
_clientState = state;
//Console.WriteLine("Client " + _clientId + ": " + _clientState);
}
break;
case EClientState.Connecting:
if (_clientState != EClientState.Available)
{
ThrowInvalidStateException(_clientState, state);
}
else
{
_clientState = state;
//Console.WriteLine("Client " + _clientId + ": " + _clientState);
BeginConnect();
}
break;
case EClientState.Connected:
if (_clientState != EClientState.Connecting)
{
ThrowInvalidStateException(_clientState, state);
}
else
{
_clientState = state;
//Console.WriteLine("Client " + _clientId + ": " + _clientState);
ChangeState(EClientState.Sending);
}
break;
case EClientState.Disconnecting:
if (_clientState == EClientState.Available ||
_clientState == EClientState.CleanUp)
{
ThrowInvalidStateException(_clientState, state);
}
else
{
_clientState = state;
//Console.WriteLine("Client " + _clientId + ": " + _clientState);
BeginDisconnect();
}
break;
case EClientState.Disconnected:
if (_clientState != EClientState.Disconnecting)
{
ThrowInvalidStateException(_clientState, state);
}
else
{
_clientState = state;
//Console.WriteLine("Client " + _clientId + ": " + _clientState);
ChangeState(EClientState.CleanUp);
}
break;
case EClientState.Sending:
if (_clientState != EClientState.Connected)
{
ThrowInvalidStateException(_clientState, state);
}
else
{
_clientState = state;
//Console.WriteLine("Client " + _clientId + ": " + _clientState);
BeginSend();
}
break;
case EClientState.SendDone:
if (_clientState != EClientState.Sending)
{
ThrowInvalidStateException(_clientState, state);
}
else
{
_clientState = state;
//Console.WriteLine("Client " + _clientId + ": " + _clientState);
ChangeState(EClientState.Receiving);
}
break;
case EClientState.Receiving:
if (_clientState != EClientState.SendDone &&
_clientState != EClientState.Receiving)
{
ThrowInvalidStateException(_clientState, state);
}
else
{
_clientState = state;
//Console.WriteLine("Client " + _clientId + ": " + _clientState);
BeginReceive();
}
break;
case EClientState.ReceiveDone:
if (_clientState != EClientState.Receiving)
{
ThrowInvalidStateException(_clientState, state);
}
else
{
_clientState = state;
//Console.WriteLine("Client " + _clientId + ": " + _clientState);
ChangeState(EClientState.Disconnecting);
}
break;
case EClientState.Failed:
if (_clientState == EClientState.CleanUp)
{
ThrowInvalidStateException(_clientState, state);
}
else
{
_clientState = EClientState.Failed;
//Console.WriteLine("Client " + _clientId + ": " + _clientState);
ChangeState(EClientState.CleanUp);
}
break;
case EClientState.CleanUp:
if (_socket.Connected)
{
ChangeState(EClientState.Disconnecting);
}
else
{
_clientState = state;
//Console.WriteLine("Client " + _clientId + ": " + _clientState);
CleanUp();
}
break;
default:
ThrowInvalidStateException(_clientState, state);
break;
}
}
catch (InvalidOperationException e)
{
throw e;
}
catch (Exception e)
{
Console.WriteLine("ChangeState Exception:\n\t[Source]: " + e.Source + "\n\t[Message]: " + e.Message);
ChangeState(EClientState.Failed);
}
}
}

private void CleanUp()
{
if (OnGetCompleted != null)
{
OnGetCompleted(_asyncTask.DocSource, _asyncTask.Host, _clientId);
}
ChangeState(EClientState.Available);
}

private void ThrowInvalidStateException(EClientState currentState, EClientState newState)
{
throw new InvalidOperationException(
String.Format("Cannot change the state from {0} to {1}",
currentState, newState));
}

private void BeginConnect()
{
BeginConnect(0);
}

private void BeginConnect(int endPointIndex)
{
if (_asyncTask.Addresses.Length <= endPointIndex)
{
// We don't have anymore end points to connect to
ChangeState(EClientState.Disconnecting);
}
else
{
EndPoint ep = new IPEndPoint(_asyncTask.Addresses[endPointIndex], _asyncTask.Port);

// Setup the async event arguments
SocketAsyncEventArgs e = new SocketAsyncEventArgs();

e.DisconnectReuseSocket = true;
e.RemoteEndPoint = ep;
e.Completed += new EventHandler<SocketAsyncEventArgs>(ConnectCallback);
e.UserToken = endPointIndex + 1;

bool completedAsync = false;
try
{
// Connect to the first available end point
completedAsync = _socket.ConnectAsync(e);
}
catch (SocketException se)
{
Console.WriteLine("Socket Exception: " + se.ErrorCode + " Message: " + se.Message);
}

if (!completedAsync)
{
// The call completed synchronously so invoke the callback ourselves.
ConnectCallback(this, e);
}
}
}

private void ConnectCallback(object sender, SocketAsyncEventArgs e)
{
lock (_sync) // re-entrant lock
{
if (e.SocketError == SocketError.Success)
{
ChangeState(EClientState.Connected);
}
else if (_asyncTask.Addresses.Length > (int)e.UserToken)
{
// Try and connect to the next available end point
BeginConnect((int)e.UserToken);
}
else
{
Console.WriteLine("Socket Error: {0} when connecting to {1}",
e.SocketError,
_asyncTask.Host);
ChangeState(EClientState.Failed);
}
}
}

private void BeginDisconnect()
{
if (_socket.Connected)
{
_socket.Shutdown(SocketShutdown.Both);

SocketAsyncEventArgs e = new SocketAsyncEventArgs();
e.DisconnectReuseSocket = false; // do not reuse the socket
e.Completed += new EventHandler<SocketAsyncEventArgs>(DisconnectCallback);

bool completedAsync = false;
try
{
// fake async since we're not reusing the socket
completedAsync = true;
_socket.Close(); // close the socket
ChangeState(EClientState.Disconnected);
//completedAsync = _socket.DisconnectAsync(e);
}
catch (SocketException se)
{
Console.WriteLine("Socket Exception: " + se.ErrorCode + " Message: " + se.Message);
ChangeState(EClientState.Failed);
}

if (!completedAsync)
{
// The call completed synchronously so invoke the callback ourselves
DisconnectCallback(this, e);
}
}
}

private void DisconnectCallback(object sender, SocketAsyncEventArgs e)
{
if (e.SocketError == SocketError.Success)
{
ChangeState(EClientState.Disconnected);
}
else
{
Console.WriteLine("Socket Error: {0} when disconnecting from {1}",
e.SocketError,
_asyncTask.Host);
ChangeState(EClientState.Failed);
}
}

private void BeginSend()
{
_clientState = EClientState.Sending;
Encoding ASCII = Encoding.ASCII;
string Get = "GET /" + _asyncTask.Path + " HTTP/1.1\r\nHost: " + _asyncTask.Host + "\r\nConnection: Close\r\n\r\n";
byte[] buffer = ASCII.GetBytes(Get);

SocketAsyncEventArgs e = new SocketAsyncEventArgs();
e.SetBuffer(buffer, 0, buffer.Length);
e.Completed += new EventHandler<SocketAsyncEventArgs>(SendCallback);

bool completedAsync = false;

try
{
completedAsync = _socket.SendAsync(e);
}
catch (SocketException se)
{
Console.WriteLine("Socket Exception: " + se.ErrorCode + " Message: " + se.Message);
}

if (!completedAsync)
{
// The call completed synchronously so invoke the callback ourselves
SendCallback(this, e);
}

}

private void SendCallback(object sender, SocketAsyncEventArgs e)
{
if (e.SocketError == SocketError.Success)
{
ChangeState(EClientState.SendDone);
}
else
{
Console.WriteLine("Socket Error: {0} when sending to {1}",
e.SocketError,
_asyncTask.Host);
ChangeState(EClientState.Failed);
}
}

private void BeginReceive()
{
if ( _clientState == EClientState.Receiving)
{
if (_asyncTask.BytesReceived != 0 && _asyncTask.TotalBytesReceived <= _maxPageSize)
{
SocketAsyncEventArgs e = new SocketAsyncEventArgs();
e.SetBuffer(_asyncTask.ReceiveBuffer, 0, _asyncTask.ReceiveBuffer.Length);
e.Completed += new EventHandler<SocketAsyncEventArgs>(ReceiveCallback);
e.UserToken = _asyncTask.Host;

bool comletedAsync = false;
try
{
comletedAsync = _socket.ReceiveAsync(e);
}
catch (SocketException se)
{
Console.WriteLine("Error receiving data from: " + _asyncTask.Host);
Console.WriteLine("SocketException: {0} Error Code: {1}", se.Message, se.NativeErrorCode);

ChangeState(EClientState.Failed);
}

if (!comletedAsync)
{
// The call completed synchronously so invoke the callback ourselves
ReceiveCallback(this, e);
}
}
else
{
//Console.WriteLine("Num bytes received: " + _asyncTask.TotalBytesReceived);
ChangeState(EClientState.ReceiveDone);
}
}
}

private void ReceiveCallback(object sender, SocketAsyncEventArgs args)
{
lock (_sync) // re-entrant lock
{
// Fast fail: should not be receiving data if the client
// is not in a receiving state.
if (_clientState == EClientState.Receiving)
{
String host = (String)args.UserToken;

if (_asyncTask.Host == host && args.SocketError == SocketError.Success)
{
try
{
Encoding encoding = Encoding.ASCII;
_asyncTask.BytesReceived = args.BytesTransferred;
_asyncTask.TotalBytesReceived += _asyncTask.BytesReceived;
_asyncTask.DocSource += encoding.GetString(_asyncTask.ReceiveBuffer, 0, _asyncTask.BytesReceived);

BeginReceive();
}
catch (SocketException e)
{
Console.WriteLine("Error receiving data from: " + host);
Console.WriteLine("SocketException: {0} Error Code: {1}", e.Message, e.NativeErrorCode);

ChangeState(EClientState.Failed);
}
}
else if (_asyncTask.Host != host)
{
Console.WriteLine("Warning: received a callback for {0}, but the client is currently working on {1}.",
host, _asyncTask.Host);
}
else
{
Console.WriteLine("Socket Error: {0} when receiving from {1}",
args.SocketError,
_asyncTask.Host);
ChangeState(EClientState.Failed);
}
}
}
}
}