Code Listings
Chapter 19: Threading
Getting started:
class ThreadTest { static void Main() { Thread t = new Thread (WriteY); // Kick off a new thread t.Start(); // running WriteY() // Simultaneously, do something on the main thread. for (int i = 0; i < 1000; i++) Console.Write ("x"); } static void WriteY() { for (int i = 0; i < 1000; i++) Console.Write ("y"); } }
Passing data to a thread:
static void Main() { Thread t = new Thread (Print); t.Start ("Hello from t!"); Print ("Hello from the main thread!"); } static void Print (object messageObj) { string message = (string) messageObj; Console.WriteLine (message); }
The outer variable trap:
static void Main() { string text = "t1"; Thread t1 = new Thread (delegate() { Print (text); }); text = "t2"; Thread t2 = new Thread (delegate() { Print (text); }); t1.Start(); t2.Start(); } static void Print (string message) { Console.WriteLine (message); }
Separate local variable stacks:
static void Main() { new Thread (Go).Start(); // Call Go() on a new thread Go(); // Call Go() on the main thread } static void Go() { // Declare and use a local variable - 'cycles' for (int cycles = 0; cycles < 5; cycles++) Console.Write (cycles); }
Sharing data through a common field:
class ThreadTest { static void Main() { Introducer intro = new Introducer(); intro.Message = "Hello"; new Thread (intro.Run).Start(); Console.ReadLine(); Console.WriteLine (intro.Reply); } } class Introducer { public string Message; public string Reply; public void Run() { Console.WriteLine (Message); Reply = "Hi right back!"; } }
Thread pooling:
static void Main() { ThreadPool.QueueUserWorkItem (Go); ThreadPool.QueueUserWorkItem (Go, 123); Console.ReadLine(); } static void Go (object data) { Console.WriteLine ("Hello from the thread pool! " + data); }
Optimizing the pool:
static void Main() { for (int i = 0; i < 50; i++) ThreadPool.QueueUserWorkItem (Go); } static void Go (object notUsed) { // Compute a hash on a 100,000 byte random byte sequence: byte[] data = new byte [100000]; new Random().NextBytes (data); System.Security.Cryptography.SHA1.Create().ComputeHash (data); }
Exception handling:
public static void Main() { new Thread (Go).Start(); } static void Go() { try { ... throw null; // this exception will get caught below ... } catch (Exception ex) { // Typically log the exception, and/or signal another thread // that we've come unstuck ... } }
Asynchronous delegates:
delegate int WorkInvoker (string text); static void Main() { WorkInvoker method = Work; IAsyncResult cookie = method.BeginInvoke ("test", null, null); // // ... here's where we can do other work in parallel... // int result = method.EndInvoke (cookie); Console.WriteLine ("String length is: " + result); } static int Work (string s) { return s.Length; }
Asynchronous delegates and callbacks:
static void Main() { WorkInvoker method = Work; method.BeginInvoke ("test", Done, method); // ... // } delegate int WorkInvoker (string text); static int Work (string s) { return s.Length; } static void Done (IAsyncResult cookie) { WorkInvoker method = (WorkInvoker) cookie.AsyncState; int result = method.EndInvoke (cookie); Console.WriteLine ("String length is: " + result); }
SimpleThreadState:
public static ThreadState SimpleThreadState (ThreadState ts) { return ts & (ThreadState.Unstarted | ThreadState.WaitSleepJoin | ThreadState.Stopped); }
Basic locking:
class ThreadSafe { static object locker = new object(); static int val1, val2; static void Go() { lock (locker) { if (val2 != 0) Console.WriteLine (val1 / val2); val2 = 0; } } }
Nested locking:
static object x = new object(); static void Main() { lock (x) { Console.WriteLine ("I have the lock"); Nest(); Console.WriteLine ("I still have the lock"); } // Now the lock is released. } static void Nest() { lock (x) { } // We still have the lock on x! }
Using a Mutex to ensure only once instance of an application can run at once:
class OneAtATimePlease { // Naming a Mutex makes it available computer-wide. Use a name that's // unique to your company and application (e.g., include your URL). static Mutex mutex = new Mutex (false, "oreilly.com OneAtATimeDemo"); static void Main() { // Wait a few seconds if contended, in case another instance // of the program is still in the process of shutting down. if (!mutex.WaitOne (TimeSpan.FromSeconds (3), false)) { Console.WriteLine ("Another instance of the app is running. Bye!"); return; } try { Console.WriteLine ("Running. Press Enter to exit"); Console.ReadLine(); } finally { mutex.ReleaseMutex(); } } }
Semaphore:
class TheClub // No door lists! { static Semaphore s = new Semaphore (3, 3); // Available=3; Capacity=3 static void Main() { for (int i = 1; i <= 5; i++) new Thread (Enter).Start (i); } static void Enter (object id) { Console.WriteLine (id + " wants to enter"); s.WaitOne(); Console.WriteLine (id + " is in!"); // Only three threads Thread.Sleep (1000 * (int) id); // can be here at Console.WriteLine (id + " is leaving"); // a time. s.Release(); } }
Thread safety and .NET Framework types:
class ThreadSafe { static List <string> list = new List <string>(); static void Main() { new Thread (AddItems).Start(); new Thread (AddItems).Start(); } static void AddItems() { for (int i = 0; i < 100; i++) lock (list) list.Add ("Item " + list.Count); string[] items; lock (list) items = list.ToArray(); foreach (string s in items) Console.WriteLine (s); } }
Thread safety in application servers:
static class UserCache { static Dictionary <int, User> _users = new Dictionary <int, User>(); internal static User GetUser (int id) { User u = null; lock (_users) if (_users.TryGetValue (id, out u)) return u; u = RetrieveUser (id); // Method to retrieve from database; lock (_users) _users [id] = u; return u; } }
Atomicity and Interlocked:
class Program { static long sum; static void Main() { // sum // Simple increment/decrement operations: Interlocked.Increment (ref sum); // 1 Interlocked.Decrement (ref sum); // 0 // Add/subtract a value: Interlocked.Add (ref sum, 3); // 3 // Read a 64-bit field: Console.WriteLine (Interlocked.Read (ref sum)); // 3 // Write a 64-bit field while reading previous value: // (This prints "3" while updating sum to 10) Console.WriteLine (Interlocked.Exchange (ref sum, 10)); // 10 // Update a field only if it matches a certain value (10): Interlocked.CompareExchange (ref sum, 123, 10); // 123 } }
Memory barriers and volatility:
class Unsafe { static bool endIsNigh, repented; static void Main() { new Thread (Wait).Start(); // Start up the spinning waiter Thread.Sleep (1000); // Give it a second to warm up! repented = true; endIsNigh = true; } static void Wait() { while (!endIsNigh); // Spin until endIsNigh Console.Write (repented); } }
Basic signaling with an event wait handle:
class BasicWaitHandle { static EventWaitHandle wh = new AutoResetEvent (false); static void Main() { new Thread (Waiter).Start(); Thread.Sleep (1000); // Pause for a second... wh.Set(); // Wake up the Waiter. } static void Waiter() { Console.WriteLine ("Waiting..."); wh.WaitOne(); // Wait for notification Console.WriteLine ("Notified"); } }
Two-way signaling with EventWaitHandle:
class TwoWaySignaling { static EventWaitHandle ready = new AutoResetEvent (false); static EventWaitHandle go = new AutoResetEvent (false); static volatile string message; // We must either use volatile // or lock around this field static void Main() { new Thread (Work).Start(); ready.WaitOne(); // First wait until worker is ready message = "ooo"; go.Set(); // Tell worker to go! ready.WaitOne(); message = "ahhh"; // Give the worker another message go.Set(); ready.WaitOne(); message = null; // Signal the worker to exit go.Set(); } static void Work() { while (true) { ready.Set(); // Indicate that we're ready go.WaitOne(); // Wait to be kicked off... if (message == null) return; // Gracefully exit Console.WriteLine (message); } } }
Pooling wait handles:
class Test { static ManualResetEvent starter = new ManualResetEvent (false); public static void Main() { ThreadPool.RegisterWaitForSingleObject (starter, Go, "hello", -1, true); Thread.Sleep (5000); Console.WriteLine ("Signaling worker..."); starter.Set(); Console.ReadLine(); } public static void Go (object data, bool timedOut) { Console.WriteLine ("Started " + data); // Perform task... } }
Signaling with Wait and Pulse:
class SimpleWaitPulse { static object locker = new object(); static bool go; static void Main() { // The new thread will block new Thread (Work).Start(); // because go==false. Console.ReadLine(); // Wait for user to hit Enter lock (locker) // Let's now wake up the thread by { // setting go=true and pulsing. go = true; Monitor.PulseAll (locker); } } static void Work() { lock (locker) while (!go) Monitor.Wait (locker); Console.WriteLine ("Woken!!!"); } }
Producer/Consumer queue:
using System; using System.Threading; using System.Collections.Generic; public class TaskQueue : IDisposable { object locker = new object(); Thread[] workers; Queue<string> taskQ = new Queue<string>(); public TaskQueue (int workerCount) { workers = new Thread [workerCount]; // Create and start a separate thread for each worker for (int i = 0; i < workerCount; i++) (workers [i] = new Thread (Consume)).Start(); } public void Dispose() { // Enqueue one null task per worker to make each exit. foreach (Thread worker in workers) EnqueueTask (null); } public void EnqueueTask (string task) { lock (locker) { taskQ.Enqueue (task); // We must pulse because we're Monitor.Pulse (locker); // changing a blocking condition. } } void Consume() { while (true) // Keep consuming until { // told otherwise string task; lock (locker) { while (taskQ.Count == 0) Monitor.Wait (locker); task = taskQ.Dequeue(); } if (task == null) return; // This signals our exit Console.Write (task); // Perform task. Thread.Sleep (1000); // Simulate time-consuming task } } }
static void Main() { using (TaskQueue q = new TaskQueue (2)) { for (int i = 0; i < 10; i++) q.EnqueueTask (" Task" + i); Console.WriteLine ("Enqueued 10 tasks"); Console.WriteLine ("Waiting for tasks to complete..."); } // Exiting the using statement runs TaskQueue's Dispose method, which // shuts down the consumers, after all outstanding tasks are completed. Console.WriteLine ("\r\nAll tasks done!"); }
Two-way signaling with Wait/Pulse:
class Solved { static object locker = new object(); static bool ready, go; static void Main() { new Thread (SaySomething).Start(); for (int i = 0; i < 5; i++) lock (locker) { while (!ready) Monitor.Wait (locker); ready = false; go = true; Monitor.PulseAll (locker); } } static void SaySomething() { for (int i = 0; i < 5; i++) lock (locker) { ready = true; Monitor.PulseAll (locker); // Remember that calling while (!go) Monitor.Wait (locker); // Monitor.Wait releases go = false; // and reacquires the lock. Console.WriteLine ("Wassup?"); } } }
Interrupt:
static void Main() { Thread t = new Thread (delegate() { try { Thread.Sleep (Timeout.Infinite); } catch (ThreadInterruptedException) { Console.Write ("Forcibly "); } Console.WriteLine ("Woken!"); }); t.Start(); t.Interrupt(); }
Safe cancellation:
class ProLife { public static void Main() { RulyWorker w = new RulyWorker(); Thread t = new Thread (w.Work); t.Start(); Thread.Sleep (1000); Console.WriteLine ("aborting"); w.Abort(); // Safely abort the worker. Console.WriteLine ("aborted"); } public class RulyWorker { volatile bool abort; public void Abort() { abort = true; } public void Work() { while (true) { CheckAbort(); // Do stuff... try { OtherMethod(); } finally { /* any required cleanup */ } } } void OtherMethod() { // Do stuff... CheckAbort(); } void CheckAbort() { if (abort) Thread.CurrentThread.Abort(); } } }
Local storage:
class Test { // The same LocalDataStoreSlot object can be used across all threads. LocalDataStoreSlot secSlot = Thread.GetNamedDataSlot ("securityLevel"); // This property has a separate value on each thread. int SecurityLevel { get { object data = Thread.GetData (secSlot); return data == null ? 0 : (int) data; // null == uninitialized } set { Thread.SetData (secSlot, value); } } ...
Using BackgroundWorker:
using System; using System.Threading; using System.ComponentModel; class Program { static BackgroundWorker bw; static void Main() { bw = new BackgroundWorker(); bw.WorkerReportsProgress = true; bw.WorkerSupportsCancellation = true; bw.DoWork += bw_DoWork; bw.ProgressChanged += bw_ProgressChanged; bw.RunWorkerCompleted += bw_RunWorkerCompleted; bw.RunWorkerAsync ("Hello to worker"); Console.WriteLine ("Press Enter in the next 5 seconds to cancel"); Console.ReadLine(); if (bw.IsBusy) bw.CancelAsync(); Console.ReadLine(); } static void bw_DoWork (object sender, DoWorkEventArgs e) { for (int i = 0; i <= 100; i += 20) { if (bw.CancellationPending) { e.Cancel = true; return; } bw.ReportProgress (i); Thread.Sleep (1000); // Just for the demo... don't go sleeping } // for real in pooled threads! e.Result = 123; // This gets passed to RunWorkerCompleted } static void bw_RunWorkerCompleted (object sender, RunWorkerCompletedEventArgs e) { if (e.Cancelled) Console.WriteLine ("You cancelled!"); else if (e.Error != null) Console.WriteLine ("Worker exception: " + e.Error.ToString()); else Console.WriteLine ("Complete: " + e.Result); // from DoWork } static void bw_ProgressChanged (object sender, ProgressChangedEventArgs e) { Console.WriteLine ("Reached " + e.ProgressPercentage + "%"); } }
ReaderWriterLockSlim:
class SlimDemo { static ReaderWriterLockSlim rw = new ReaderWriterLockSlim(); static List<int> items = new List<int>(); static Random rand = new Random(); static void Main() { new Thread (Read).Start(); new Thread (Read).Start(); new Thread (Read).Start(); new Thread (Write).Start ("A"); new Thread (Write).Start ("B"); } static void Read() { while (true) { rw.EnterReadLock(); foreach (int i in items) Thread.Sleep (10); rw.ExitReadLock(); } } static void Write (object threadID) { while (true) { int newNumber = GetRandNum (100); rw.EnterWriteLock(); items.Add (newNumber); rw.ExitWriteLock(); Console.WriteLine ("Thread " + threadID + " added " + newNumber); Thread.Sleep (100); } } static int GetRandNum (int max) { lock (rand) return rand.Next (max); } }
Upgradeable locks with ReaderWriterLockSlim:
class SlimDemo { static ReaderWriterLockSlim rw = new ReaderWriterLockSlim(); static List<int> items = new List<int>(); static Random rand = new Random(); static void Main() { new Thread (Read).Start(); new Thread (Read).Start(); new Thread (Read).Start(); new Thread (Write).Start ("A"); new Thread (Write).Start ("B"); } static void Read() { while (true) { rw.EnterReadLock(); foreach (int i in items) Thread.Sleep (10); rw.ExitReadLock(); } } static void Write (object threadID) { while (true) { int newNumber = GetRandNum (100); rw.EnterUpgradeableReadLock(); if (!items.Contains (newNumber)) { rw.EnterWriteLock(); items.Add (newNumber); rw.ExitWriteLock(); Console.WriteLine ("Thread " + threadID + " added " + newNumber); } rw.ExitUpgradeableReadLock(); Thread.Sleep (100); } } static int GetRandNum (int max) { lock (rand) return rand.Next (max); } }
Lock recursion:
var rw = new ReaderWriterLockSlim(); rw.EnterReadLock(); rw.EnterReadLock(); // Exception thrown rw.ExitReadLock(); rw.ExitReadLock();
var rw = new ReaderWriterLockSlim (LockRecursionPolicy.SupportsRecursion); rw.EnterWriteLock(); rw.EnterReadLock(); Console.WriteLine (rw.IsReadLockHeld); // True Console.WriteLine (rw.IsWriteLockHeld); // True rw.ExitReadLock(); rw.ExitWriteLock();
Using the Threading Timer:
using System; using System.Threading; class Program { static void Main() { // First interval = 5000ms; subsequent intervals = 1000ms Timer tmr = new Timer (Tick, "tick...", 5000, 1000); Console.ReadLine(); tmr.Dispose(); // Ends the timer } static void Tick (object data) { // This runs on a pooled thread Console.WriteLine (data); // Writes "tick..." } }
Using the System Timer:
using System; using System.Timers; // Timers namespace rather than Threading class SystemTimer { static void Main() { Timer tmr = new Timer(); // Doesn't require any args tmr.Interval = 500; tmr.Elapsed += tmr_Elapsed; // Uses an event instead of a delegate tmr.Start(); // Start the timer Console.ReadLine(); tmr.Stop(); // Stop the timer Console.ReadLine(); tmr.Start(); // Re-start the timer Console.ReadLine(); tmr.Dispose(); // Permanently stop the timer } static void tmr_Elapsed (object sender, EventArgs e) { Console.WriteLine ("Tick"); } }
© 2007, O'Reilly Media, Inc. All rights reserved