Wednesday, March 31, 2010

Load Balancing Actors with Work Stealing Techniques


Actor concurrency is well known to provide a more convenient model to concurrency than traditional thread based approaches. In contrast with threads, you can have as many actors as you want (if memory permits) and the actor implementation will take care of processing actors on threads whenever necessary behind the scenes.

In Akka, which among other things provides actors for the JVM (in Scala), the component which schedules actors for processing on a set of threads is called the "dispatcher". The default dispatcher uses a fixed size thread pool to process actors on. When a message is sent to an actor, a task is scheduled on the thread pool to process all the messages in the mailbox of the actor.

What actor concurrency does not provide out of the box is load balancing among equal actors ("equal" meaning they are both capable of processing the same types of messages).

You can implement load balancing on top of actor frameworks (example in Akka), by sending messages to a set of actors in some clever way (round robin, always choosing the one with the smallest mailbox, ...) but this has a few limitations. This kind of load balancing is rather "static": once a message is sent to an actor, it stays there until that actor processes it. If another actor is already done, it can never help the slow actor for messages that have already been distributed.

Here comes work stealing.

Work stealing is an idea used for example in the Fork/Join framework. Applied to actors, it implies that idle actors can "steal work" from the tail of the mailbox of busy actors. Of course actors are no threads, so "idle actors" can not really do anything because they are not assigned to a thread as long as they stay idle. This is why the implementation is a bit more subtle than the conceptual idea.

I have implemented a work stealing dispatcher for Akka actors. Although its called "work stealing" the implementation actually behaves more as "work donating" because the victim actor takes the initiative. I.e. it actually donates work to its thief, rather than having the thief steal work from the victim.

There is some cleverness in the dispatcher implementation using ReentrantLock.tryLock to check if an actor which is being scheduled on the dispatcher is actually capable of processing its mailbox (another thread might already be busy). If an actor can not process messages for this reason, it will try selecting a thief (round robin), donate a message from its mailbox to the thief (a ConcurrentLinkedDeque, using Deque.pollLast) and process the message in the thief.

Using the victim thread to do the "work donating" while this thread can not be used to process the victims mailbox itself because another thread is already doing that, implies that no threads are "waisted" on the work stealing process. Furthermore, when selecting a thief, the algorithm will make sure that the thief is idle, such that no messages are being redistributed to an actor which can not process them immediately. This also prevents stolen messages from being stolen again by another actor later on.

 To use it, you simply declare to use the work stealing dispatcher in your Akka actors:

val dispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pooled-dispatcher")

class WorkstealingActor extends Actor {
  messageDispatcher = dispatcher
  def receive = {
    case test => {/* process message */}
  }
}

When you create multiple instances of this actor, they'll all share the same work stealing dispatcher.

Monday, February 15, 2010

Automatically reload log4j configuration in tomcat

I was looking for a way to have Log4j reload its configuration file automatically when it changes, in Tomcat. Log4j can do this with the configureAndWatch method, but the default initialization procedure (simply putting a log4j.properties file in the classpath) doesn't use configureAndWatch. You have to write at least a little bit of code to get Log4j to do this. I found the easiest solution for integration with Tomcat to be to implement a Tomcat LifecycleListener.

import org.apache.catalina.Lifecycle;
import org.apache.catalina.LifecycleEvent;
import org.apache.catalina.LifecycleListener;
import org.apache.log4j.LogManager;
import org.apache.log4j.PropertyConfigurator;

public class Log4JInitializer implements LifecycleListener
{
    private String propertiesFile;

    public String getPropertiesFile()
    {
        return this.propertiesFile;
    }

    public void setPropertiesFile(String propertiesFile)
    {
        this.propertiesFile = propertiesFile;
    }

    @Override
    public void lifecycleEvent(LifecycleEvent event)
    {
        if (Lifecycle.BEFORE_START_EVENT.equals(event.getType()))
            initializeLog4j();
    }

    private void initializeLog4j()
    {
        // configure from file, and let log4j monitor the file for changes
        PropertyConfigurator.configureAndWatch(propertiesFile);

        // shutdown log4j (and its monitor thread) on shutdown
        Runtime.getRuntime().addShutdownHook(new Thread()
        {
            @Override
            public void run()
            {
                LogManager.shutdown();
            }
        });
    }
}

I simply listen for the "BEFORE_START_EVENT", and if that happens (which is once per Tomcat startup) I initialize Log4j using the configureAndWatch method. I also don't forget to install a shutdown hook to cleanup the thread Log4j creates to poll the configuration file for changes (I could also have chosen to listen to the "AFTER_STOP_EVENT" from Tomcat in stead).

Package this in a jar, put it on the Tomcat classpath, and now you can configure it in your Tomcat serverl.xml.

<Server>
  ...
  <Listener className="Log4JInitializer" propertiesFile="/path/to/log4j.properties"/>
</Server>


Can't be much easier, and it does what it has to do.

Wednesday, January 20, 2010

Performance tuning tools for a multi core world

Recently I was performance tuning a Java application on a quad core Intel Xeon X5560. This processor has hyper threading, so it shows up in the operating system as if it were 8 cores.

My application was slower than I wanted it to be, and I was faced with this vmstat output:
procs -----------memory---------- ---swap-- -----io---- --system-- -----cpu------
 r  b   swpd   free   buff  cache   si   so    bi    bo   in   cs us sy id wa st
 3  0 1048660 209840 199608 4123568    0    0   150    24 1577 2599 13  9 77  1  0
 2  0 1048660 210220 199612 4123712    0    0   104    82 1593 2891 13 10 75  2  0
 1  1 1048660 209260 199628 4124596    0    0   484   210 1537 2567 10  7 77  6  0
 1  1 1048660 213940 199612 4120184    0    0  1232    32 1548 2694  9  7 76  8  0

At first sight, I thought it looked ok. That's between 9 and 13 % CPU time in user space, between 7 and 10 % in system space, and 75 to 77 % is idle. So it looked like my application is not using much of the CPU at all...

My application was still slower than I wanted it to be, so I went on looking for reasons why it was not using more of the CPU. I took some thread dumps and used the YourKit profiler to measure the time threads were waiting for locks. I couldn't find anything.

Then I had a look at what "top" had to say, in stead of using vmstat. It showed comparable numbers:
Cpu(s): 14.6%us, 10.5%sy,  0.0%ni, 73.2%id,  1.2%wa,  0.0%hi,  0.5%si,  0.0%st

But it says "Cpu(s)", so I hit "1" to get a view per CPU... now things started to get interesting:
Cpu0  : 12.7%us,  8.7%sy,  0.0%ni, 73.3%id,  3.3%wa,  0.0%hi,  2.0%si,  0.0%st
Cpu1  : 13.2%us,  8.3%sy,  0.0%ni, 77.5%id,  0.7%wa,  0.0%hi,  0.3%si,  0.0%st
Cpu2  : 14.3%us,  8.3%sy,  0.0%ni, 77.3%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
Cpu3  : 18.0%us,  9.7%sy,  0.0%ni, 70.7%id,  0.7%wa,  0.0%hi,  1.0%si,  0.0%st
Cpu4  : 18.0%us, 17.3%sy,  0.0%ni, 64.7%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
Cpu5  : 15.3%us, 12.0%sy,  0.0%ni, 72.7%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
Cpu6  : 11.6%us,  8.9%sy,  0.0%ni, 79.5%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
Cpu7  : 13.3%us,  9.0%sy,  0.0%ni, 77.3%id,  0.3%wa,  0.0%hi,  0.0%si,  0.0%st

Now of course, the problem was staring me in the face. The CPU times of vmstat and top (and many other tools) are relative to the total amount of CPUs. With 8 cores, this means that (for a single thread) 12.5% represents a CPU usage of 100% on only one CPU. With this in mind, the numbers of vmstat start to make sense. They mean that more or less 1 CPU is fully busy in user space, and one other CPU is almost fully busy in system space. In other words, my application would probably be faster if it did less I/O and more concurrent processing.

It also means that, with more and more CPUs, tools like vmstat are becoming less usefull. With only one CPU, you get a scale from 0 to 100. With 8 cores, the scale is reduced from 0 to 12. The granularity of the output of vmstat is 1 percentage, so with 100 cores the difference between some random "noise" on an otherwise idle machine and an idle machine except for 1 core which is fully used, will become invisible.

Its not only vmstat of course, the output of the CPU profiling in YourKit for example, has the same problem. The graph stays around 12.5%:


So to conclude:

I think we might need to update our profiling tools. We can also add new tools to our profiling toolbox. I experimented a little with mpstat for example, which gives much of the same information as vmstat, but does it per CPU.

And of course, as we all already know, we also also have to update our applications. Concurrency is not going away any time soon.

Sunday, January 3, 2010

A Scala library for IPv4 related concepts

I've been using the Xmas holidays to write lots of Scala. One of the things I wrote, partially just to learn some more Scala, partially because I believe it can be useful, is a library to work with IPv4 related concepts: IP addresses, IP networks, ranges of IP addresses etc.

In this post, I'll give a short overview of the library, and share some thoughts about my Scala experience while writing it.

The library exposes a few core concepts which can be useful when doing calculations with IP addresses etc.

IpAddress

IpAddress represents an IPv4 address.
val address = new IpAddress("192.168.0.1")
IpAddress can be used to make calculations on IP addresses, such as finding the next address.
val address = new IpAddress("192.168.0.1")
val next = address + 1
println(next == new IpAddress("192.168.0.2")) // prints true

IpAddressRange

IpAddressRange represents a continuous range of consecutive addresses.
val range = new IpAddressRange(new IpAddress("192.168.0.1"), new IpAddress("192.168.0.5"))
println(range.contains(new IpAddress("192.168.0.3")) // prints true

IpNetworkMask

IpNetworkMask represents a network mask, to be used in an IpNetwork.
val mask1 = new IpNetworkMask("255.255.255.128")
val mask2 = IpNetworkMask.fromPrefixLength(25)
println(mask1 == mask2) // prints true

val invalid = new IpNetworkMask("255.255.255.100") // throws exception

IpNetwork

An IpNetwork is a range (extends IpAddressRange) that can be expressed as a network address and a network mask.
val network1 = new IpNetwork(new IpAddress("192.168.0.0"), new IpNetworkMask("255.255.255.0"))
val network2 = new IpNetwork("192.168.0.0/24")
println(network1 == network2) // prints true

IpAddressPool

An IpAddressPool is like a range (extends IpAddressRange) of which certain addresses are "allocated" and others are "free". It could for example be used as part of a DHCP server implementation.
var pool = new IpAddressPool(new IpAddress("1.2.3.4"), new IpAddress("1.2.3.10"))
println(pool.isFree(new IpAddress("1.2.3.6"))) // prints true
pool.allocate(new IpAddress("1.2.3.6")) match {
    case (newPool, allocated) => {
            println(newPool.isFree(new IpAddress("1.2.3.6"))) // prints false
    }
}

And Much More

This was only a short introduction. Much more can be done with these types. Have a look at the scaladoc to get an idea of the possibilities.

What I liked about Scala

I became a fan of Scala some time ago already, but there were a few things which struck me as especially nice or elegant while writing this library:
Lazy lists
The IpAddressRange class has a method to list all addresses in the range. Also, the IpAddressPool class has methods to list all the free, and all the allocated addresses. With big ranges, it becomes virtually impossible to return an array or so containing all the addresses. Scala has the Stream concept to deal with this. A Stream is effectively a "lazy list". In other words, a List of which the elements are only evaluated when they are needed. With a recursive implementation, this results in very elegant code:
def addresses(): Stream[IpAddress] = {
    if (first < last) {
        Stream.cons(first, new IpAddressRange(first + 1, last).addresses)
    } else {
        Stream.cons(first, Stream.empty)
    }
}
Functional or Procedural
Although I try to write most of my scala code as "functional" as possible, sometimes it is nice to be able to fall back on a procedural implementation. To do calculations on IP addresses and network masks, a lot of "bit twiddling" is required. I found it usually easier to write that in a procedural style than purely functional (although that could be due to my limited functional experience as well), e.g.:
def fromLongToPrefixLength(value: Long): Int = {
    val lsb: Long = value & 0xFFFFFFFFL
    var result: Int = 0
    var bit: Long = 1L << 31

    while (((lsb & bit) != 0) && (result & 32)) {
      bit = bit >> 1
      result += 1
    }
    result
}
Option
The Option type needs no introduction, but it's so much more fun than using null values all over the place.
Immutability
Scala favours immutable types. This naturally steered me towards making all types in my library immutable as well. Because you can rely on immutable data structures in the Scala library (List, SortedSet, ...), it is much easier than in Java to get it right.
What I might dislike about Scala
h I encountered a few things which seemed a bit awkward or strange. Most of the issues are due to my lack of experience, so I'll give myself some more time to figure out whether they are really things I don't like or not.
Tuples
Tuples for example, are a concept that I am in doubt about. I used them extensively in the IpAddressPool class. For example, when allocating an address in a pool, the method returns an Option of the allocated address (None if the address was not free) and the new pool (because the IpAddressPool is immutable):
def allocate(toAllocate: IpAddress): (IpAddressPool, Option[IpAddress]) = {...}
This is really easy on the implementation side, but I'm not sure whether I like the client code so much. You either have to access the values in the tuple with "._1" and "._2", which doesn't read easy, or you have to pattern match, which seems a bit verbose. You can also assign the result to a new tuple directly
val pool = new IpAddressPool(new IpAddress("1.2.3.1"), new IpAddress("1.2.3.10"))
val (newPool, allocatedAddress) = pool.allocate(new IpAddress("1.2.3.6"))
but it doesn't seem to be possible to assign the result of a subsequent invocation again to the same tuple (using a var)
val pool = new IpAddressPool(new IpAddress("1.2.3.1"), new IpAddress("1.2.3.10"))
var (newPool, allocatedAddress) = pool.allocate(new IpAddress("1.2.3.6"))

// this won't compile
(newPool, allocatedAddress) = pool.allocate(new IpAddress("1.2.3.7")
Collections
I've had some usability issues with the collections library. The collections library is being worked on for the upcoming scala 2.8 version, so things will definitely improve.
I also found a bug in the TreeSet implementation of SortedSet, causing the tree to be heavily unbalanced which results in StackOverflowErrors when recursively traversing it (I hope this can get fixed for 2.8 as well).

Conclusion

Besides these minor issues, it was a very positive experience. I happen to have a Java version of this library as well, and I certainly like the scala version much better (both in using and in writing it).