Send ActionScript Worker Messages 6x Faster With Mutex

Recommended for you: Get network issues from WhatsUp Gold. Not end users.
While ActionScript Workers made their debut in Flash Player 11.4, the Mutex class didn’t arrive until the next version: 11.5. This class is specifically designed to solve a subtle problem that cropped up in the last article. As you’ll see in this article, it does the job quite well! The result is even faster message passing between workers/threads, which is often key to efficiently using multiple core CPUs.

The Mutex class is special in a couple of ways. First, when you call Thread.setSharedProperty and pass in a Mutex, the Mutex object is not copied to the other worker. Instead, the other worker has the same Mutex object. This behavior is key to how Mutex works, as you’ll soon see.
The second special functionality is the sole reason for Mutex to exist. It provides a mechanism for you to “lock” and “unlock” it, but only one worker can lock the Mutex at a time. Let’s see a quick example:
function doubleFirstByte(bytes:ByteArray, mutex:Mutex): void
{
	// Lock the Mutex. If the Mutex is already locked by another worker,
	// this worker will be suspended until the Mutex is unlocked. At that
	// point the lock() function will return.
	mutex.lock();
 
	// The Mutex has now been locked by our thread. In this example we
	// take that to mean that the ByteArray is free for us to manipulate.
	// Here we simply double the first byte's value.
	bytes[0] *= 2;
 
	// Now that we're done with the Mutex we need to tell other workers
	// so they can resume and their calls to lock() will return. Calling
	// unlock does exactly this.
	mutex.unlock();
} 
Imagine two workers running the doubleFirstByte function. If it weren’t for the Mutex they might execute in an arbitrary order. Here is one scenario where it would all go wrong:
Initial value is set to 1
Worker A executes and reads 1
Worker B executes and reads 1
Worker A executes and writes 2
Worker B executes and writes 2
Final result after two runs: 2
Expected result: 4 
Any number of scenarios could play out because you don’t know the order that the threads will execute in. Hoping for the best simply won’t do, so you need a way to communicate between the threads and avoid contention over the shared ByteArray. The first article used a MessageChannel to communicate, but it was quite slow. The the last article directly passed messages via Thread.setSharedProperty and achieved a 2.5x speedup, but it employed a tight loop to continuously call Thread.getSharedProperty until a response from the other worker was received:
// Check for the incoming message over and over until it's received
while (worker.getSharedProperty("message") == inMsg)
{
} 
Essentially, the receiving worker would spend 100% of the CPU time it was allotted to simply check for incoming messages. With a Mutex, it can simply suspend execution until one is received.
Let’s see how that strategy works out. The following is an amended version of the last article’s small test application that includes a Mutex-based test as shown above.
package
{
	import flash.display.Sprite;
	import flash.events.Event;
	import flash.utils.getTimer;
	import flash.utils.ByteArray;
	import flash.text.TextField;
	import flash.text.TextFieldAutoSize;
	import flash.system.Worker;
	import flash.system.WorkerDomain;
	import flash.system.WorkerState;
	import flash.system.MessageChannel;
	import flash.concurrent.Mutex;
 
	/**
	* Test to show the speed of passing messages between workers via three
	* methods: MessageChannel, setSharedProperty with a tight polling loop, and
	* setSharedProperty with a Mutex.
	* @author Jackson Dunstan (JacksonDunstan.com)
	*/
	public class MutexTest extends Sprite
	{
		/** Output logger */
		private var logger:TextField = new TextField();
 
		/**
		* Log a CSV row
		* @param cols Columns of the row
		*/
		private function row(...cols): void
		{
			logger.appendText(cols.join(",")+"\n");
		}
 
		/** Message channel from the main thread to the worker thread */
		private var mainToWorker:MessageChannel;
 
		/** Message channel from the worker thread to the main thread */
		private var workerToMain:MessageChannel;
 
		/** The worker thread (main thread only) */
		private var worker:Worker;
 
		/** Number of messages to send back and forth in the test */
		private var REPS:int = 1000;
 
		/** Time before the message passing test started */
		private var beforeTime:int;
 
		/** Current message index */
		private var cur:int;
 
		/**
		* Start the app in main thread or worker thread mode
		*/
		public function MutexTest()
		{
			// Setup the logger
			logger.autoSize = TextFieldAutoSize.LEFT;
			addChild(logger);
 
			// If this is the main SWF, start the main thread
			if (Worker.current.isPrimordial)
			{
				startMainThread();
			}
			// If this is the worker thread SWF, start the worker thread
			else
			{
				startWorkerThread();
			}
		}
 
		/**
		* Start the main thread
		*/
		private function startMainThread(): void
		{
			// Try to get a very good framerate
			stage.frameRate = 60;
 
			// Create the worker from our own SWF bytes
			worker = WorkerDomain.current.createWorker(this.loaderInfo.bytes);
 
			// Create a message channel to send to the worker thread
			mainToWorker = Worker.current.createMessageChannel(worker);
			worker.setSharedProperty("mainToWorker", mainToWorker);
 
			// Create a message channel to receive from the worker thread
			workerToMain = worker.createMessageChannel(Worker.current);
			workerToMain.addEventListener(Event.CHANNEL_MESSAGE, onWorkerToMainDirect);
			worker.setSharedProperty("workerToMain", workerToMain);
 
			// Start the worker
			worker.start();
 
			// Begin the test where we use MessageChannel to send messages
			// between the threads by sending the first message
			beforeTime = getTimer();
			mainToWorker.send("1");
		}
 
		/**
		* Start the worker thread
		*/
		private function startWorkerThread(): void
		{
			// Get the message channels the main thread set up for communication
			// between the threads
			mainToWorker = Worker.current.getSharedProperty("mainToWorker");
			workerToMain = Worker.current.getSharedProperty("workerToMain");
			mainToWorker.addEventListener(Event.CHANNEL_MESSAGE, onMainToWorker);
		}
 
		/**
		* Callback for when a message has been received from the main thread to
		* the worker thread on a MessageChannel
		* @param ev CHANNEL_MESSAGE event
		*/
		private function onMainToWorker(ev:Event): void
		{
			// Record the message and send a response
			cur++;
			workerToMain.send("1");
 
			// If this was the last message, prepare for the next test where the
			// two threads communicate with shared properties
			if (cur == REPS)
			{
				// We receive "1" on our own Thread (the worker thread) and send
				// "2" in response for the setSharedProperty and mutex tests
				setSharedPropertyTest(Worker.current, "1", "2", false);
				mutexTest(Worker.current, "1", "2", false);
			}
		}
 
		/**
		* Callback for when the worker thread sends a message to the main thread
		* via a MessageChannel
		* @param ev CHANNEL_MESSAGE event
		*/
		private function onWorkerToMainDirect(ev:Event): void
		{
			// Record the message and show progress (this version is slow)
			cur++;
			logger.text = "MessageChannel: " + cur + " / " + REPS;
 
			// If this wasn't the last message, send another message to the
			// worker thread
			if (cur < REPS)
			{
				mainToWorker.send("1");
				return;
			}
 
			// The MessageChannel test is done. Record the time it took.
			var afterTime:int = getTimer();
			var messageChannelTime:int = afterTime - beforeTime;
 
			// Run the setSharedProperty test where the two threads communicate
			// by directly setting shared properties on the worker thread. The
			// main thread receives "2", sends "1", and is responsible for
			// starting the process with an initial "1" message.
			beforeTime = getTimer();
			setSharedPropertyTest(worker, "2", "1", true);
			afterTime = getTimer();
			var setSharedPropertyTime:int = afterTime - beforeTime;
 
			// Run the mutex test where the two threads communicate by directly
			// setting shared properties on the worker thread and pause
			// execution via a mutex instead of constantly polling via a tight
			// while loop. The main thread receives "2", sends "1", and is
			// responsible for starting the process with an initial "1" message.
			beforeTime = getTimer();
			mutexTest(worker, "2", "1", true);
			afterTime = getTimer();
			var mutexTime:int = afterTime - beforeTime;
 
			// Clear the logger and show the results instead
			logger.text = "";
			row("Type", "Time", "Messages/sec");
			var messagesPerSecond:Number = messageChannelTime/Number(REPS);
			row("MessageChannel", messageChannelTime, messagesPerSecond);
			messagesPerSecond = setSharedPropertyTime/Number(REPS);
			row("setSharedProperty", setSharedPropertyTime, messagesPerSecond);
			messagesPerSecond = mutexTime/Number(REPS);
			row("mutex", mutexTime, messagesPerSecond);
		}
 
		/**
		* Perform the setSharedProperty test where the two threads communicate
		* by directly setting shared properties on the worker thread. This
		* version uses a tight polling loop to repeatedly check until the
		* incoming message is ready.
		* @param worker Worker the shared properties are set on
		* @param inMessage Expected message this thread receives from the other
		* @param outMessage Message to send to the other thread
		* @param sendInitial If an initial message should be sent
		*/
		private function setSharedPropertyTest(
			worker:Worker,
			inMsg:String,
			outMsg:String,
			sendInitial:Boolean
		): void
		{
			// Reset the count from the first test
			cur = 0;
 
			// Optionally send an initial outgoing message to start the process
			if (sendInitial)
			{
				worker.setSharedProperty("message", outMsg);
			}
 
			// Send messages until we've hit the limit
			while (cur < REPS)
			{
				// Check to see if the shared property is the incoming message
				if (worker.getSharedProperty("message") == inMsg)
				{
					// Record the message and send a response by setting the
					// shared property to the outgoing message
					cur++;
					worker.setSharedProperty("message", outMsg);
				}
			}
		}
 
		/**
		* Perform the setSharedProperty test where the two threads communicate
		* by directly setting shared properties on the worker thread. This
		* version uses a Mutex to pause execution of the worker/thread until the
		* incoming message is ready.
		* @param worker Worker the shared properties are set on
		* @param inMessage Expected message this thread receives from the other
		* @param outMessage Message to send to the other thread
		* @param sendInitial If an initial message should be sent
		*/
		private function mutexTest(
			worker:Worker,
			inMsg:String,
			outMsg:String,
			sendInitial:Boolean
		): void
		{
			// Reset the count from the first test
			cur = 0;
 
			// Optionally send an initial outgoing message to start the process
			var mutex:Mutex;
			if (sendInitial)
			{
				mutex = new Mutex();
				worker.setSharedProperty("mutex", mutex);
				worker.setSharedProperty("message", outMsg);
			}
			else
			{
				// Wait for the main thread to send the mutex
				do
				{
					mutex = worker.getSharedProperty("mutex") as Mutex;
				}
				while (mutex == null);
			}
 
			// Send messages until we've hit the limit
			while (cur < REPS)
			{
				// Wait for the other thread to unlock the mutex. When they do,
				// the incoming message is ready.
				mutex.lock();
 
				// Record the message and send a response by setting the
				// shared property to the outgoing message
				cur++;
				worker.setSharedProperty("message", outMsg);
 
				// Notify the other thread that the outgoing message is ready
				// by unlocking the mutex
				mutex.unlock();
			}
		}
	}
} 
Run the test
I ran this test in the following environment:
Release version of Flash Player 11.9.900.1172.3 Ghz Intel Core i7Mac OS X 10.9.0Google Chrome 30.0.1599.101ASC 2.0.0 build 353981 (-debug=false -verbose-stacktraces=false -inline -optimize=true) And here are the results I got:
Type Time Messages/sec MessageChannel 153 6535.9 setSharedProperty 62 16129 mutex 25 40000
The Mutex code is definitely doing its job. Each worker/thread no longer needs to spend 100% of its CPU time continually checking for messages, but instead can simply suspend until a message is ready. This pays off with an over 2x speedup from the setSharedProperty method and over 6x speedup from the convenient-yet-slow MessageChannel method. Is the speedup worth requiring Flash Player 11.5? That decision is up to you.
Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download

Posted by Regan at December 24, 2013 - 11:56 AM