Code Listings
Chapter 20: Asynchronous Methods
Download Asynchronator (Visual Studio Test Project)
Blocking server:
using System; using System.Threading; using System.Net; using System.Net.Sockets; public class Server { public void Serve (IPAddress address, int port) { ThreadPool.SetMinThreads (50, 50); // Refer Chapter 19 TcpListener listener = new TcpListener (address, port); listener.Start(); while (true) { TcpClient c = listener.AcceptTcpClient(); ThreadPool.QueueUserWorkItem (Accept, c); } } void Accept (object clientObject) { using (TcpClient client = (TcpClient) clientObject) using (NetworkStream n = client.GetStream()) { byte[] data = new byte [5000]; int bytesRead = 0; int chunkSize = 1; while (bytesRead < data.Length && chunkSize > 0) bytesRead += chunkSize = n.Read (data, bytesRead, data.Length - bytesRead); // BLOCKS Array.Reverse (data); n.Write (data, 0, data.Length); // BLOCKS } } }
Non-blocking server:
using System; using System.Threading; using System.Net; using System.Net.Sockets; namespace AsynchronousMethods.Nonblocking { public class Server { public void Serve (IPAddress address, int port) { ThreadPool.SetMinThreads (50, 50); TcpListener listener = new TcpListener (address, port); listener.Start(); while (true) { TcpClient c = listener.AcceptTcpClient(); ThreadPool.QueueUserWorkItem (ReverseEcho, c); } } void ReverseEcho (object client) { new ReverseEcho().Begin ((TcpClient)client); } } class ReverseEcho { volatile TcpClient _client; volatile NetworkStream _stream; byte [] _data = new byte [5000]; volatile int _bytesRead = 0; internal void Begin (TcpClient c) { try { _client = c; _stream = c.GetStream(); Read(); } catch (Exception ex) { ProcessException (ex); } } void Read() // Read in a non-blocking fashion. { _stream.BeginRead (_data, _bytesRead, _data.Length - _bytesRead, ReadCallback, null); } void ReadCallback (IAsyncResult r) { try { int chunkSize = _stream.EndRead (r); _bytesRead += chunkSize; if (chunkSize > 0 && _bytesRead < _data.Length) { Read(); // More data to read! return; } Array.Reverse (_data); _stream.BeginWrite (_data, 0, _data.Length, WriteCallback, null); } catch (Exception ex) { ProcessException (ex); } } void WriteCallback (IAsyncResult r) { try { _stream.EndWrite (r); } catch (Exception ex) { ProcessException (ex); } Cleanup(); } void ProcessException (Exception ex) { Cleanup(); Console.WriteLine ("Error: " + ex.Message); } void Cleanup() { if (_stream != null) _stream.Close(); if (_client != null) _client.Close(); } } }
Writing asynchronous methods:
public class MessagingServices { public static IAsyncResult BeginReverseEcho (TcpClient client, AsyncCallback callback, object userState) { var re = new ReverseEcho(); re.Begin (client, callback, userState); return re; } public static byte [] EndReverseEcho (IAsyncResult r) { return ((ReverseEcho)r).End(); } } class ReverseEcho : IAsyncResult { volatile TcpClient _client; volatile NetworkStream _stream; volatile object _userState; volatile AsyncCallback _callback; ManualResetEvent _waitHandle = new ManualResetEvent (false); volatile int _bytesRead = 0; byte [] _data = new byte [Program.MessageLength]; volatile Exception _exception; internal ReverseEcho() { } // IAsyncResult members: public object AsyncState { get { return _userState; } } public WaitHandle AsyncWaitHandle { get { return _waitHandle; } } public bool CompletedSynchronously { get { return false; } } public bool IsCompleted { get { return _waitHandle.WaitOne (0, false); } } internal void Begin (TcpClient c, AsyncCallback callback, object state) { _client = c; _callback = callback; _userState = state; try { _stream = _client.GetStream(); Read(); } catch (Exception ex) { ProcessException (ex); } } internal byte [] End() // Wait for completion + re-throw any error. { AsyncWaitHandle.WaitOne(); AsyncWaitHandle.Close(); if (_exception != null) throw _exception; return _data; } void Read() // This is always called from an exception-handled method { _stream.BeginRead (_data, _bytesRead, _data.Length - _bytesRead, ReadCallback, null); } void ReadCallback (IAsyncResult r) { try { int chunkSize = _stream.EndRead (r); _bytesRead += chunkSize; if (chunkSize > 0 && _bytesRead < _data.Length) { Read(); // More data to read! return; } Array.Reverse (_data); _stream.BeginWrite (_data, 0, _data.Length, WriteCallback, null); } catch (Exception ex) { ProcessException (ex); } } void WriteCallback (IAsyncResult r) { try { _stream.EndWrite (r); } catch (Exception ex) { ProcessException (ex); return; } Cleanup(); } void ProcessException (Exception ex) { _exception = ex; // This will get re-thrown when the Cleanup(); // consumer calls the End method. } void Cleanup() { try { if (_stream != null) _stream.Close(); } catch (Exception ex) { if (_exception != null) _exception = ex; } // Signal that we're done and fire the callback. _waitHandle.Set(); if (_callback != null) _callback (this); } }
© 2007, O'Reilly Media, Inc. All rights reserved