Monday, September 3, 2012

We are hiring!

We (NGDATA) are looking for Java Software Engineers (with our without big data experience) for both our R&D team and professional services team. There are a number of open positions, hence both senior and junior profiles are welcome. You will be working on both the open source and enterprise versions of Lily using big data technology such as Hadoop, HBase and Solr. We have offices in Belgium (Ghent) and the US (San Francisco and New York).

Interested? Contact us! Or directly janvanbesien AT ngdata DOT com.

Wednesday, October 12, 2011

A Java library for IPv6

A while ago, I wrote a library to work with IPv4 concepts (IPv4 addresses, networks, network masks, prefix lengths, ranges of addresses etc) in Scala. Along those lines, I recently created a similar library to work with IPv6 concepts. This time though, I used good old Java.

If you simply need to make IPv6 connections, will be sufficient. However when doing things a little bit more advanced with the addresses themselves, I find (both Inet4Address and Inet6Address) lacking a lot of functionality.

Here is a short overview of what can be done with my java-ipv6 library.


The class IPv6Address represents an IPv6 address.

final IPv6Address iPv6Address = IPv6Address.fromString("fe80::226:2dff:fefa:cd1f");
Internally, the IPv6Address class uses two long values to store the IPv6 address. This makes for an optimized implementation, and a lot of bit twiddling fun for me while writing it...

IPv6Address can be used to make simple calculations on IPv6 addresses, such as addition and subtraction.

final IPv6Address iPv6Address = IPv6Address.fromString("fe80::226:2dff:fefa:cd1f");
final IPv6Address next = iPv6Address.add(1);
final IPv6Address previous = iPv6Address.subtract(1);
System.out.println(next.toString()); // prints fe80::226:2dff:fefa:cd20
System.out.println(previous.toString()); // prints fe80::226:2dff:fefa:cd1e


The class IPv6AddressRange represents a continuous range of consecutive IPv6 addresses.

final IPv6AddressRange range = new IPv6AddressRange(IPv6Address.fromString("fe80::226:2dff:fefa:cd1f"),
System.out.println(range.contains(IPv6Address.fromString("fe80::226:2dff:fefa:dcba"))); // prints true
IPv6AddressRange contains methods to iterate over all the addresses in the range. Ranges can be compared with other ranges by checking if they overlap or if one range contains the other range.


An IPv6Network is a range (extends IPv6AddressRange) that can be expressed as a network address and a prefix length.

final IPv6Network range = new IPv6Network(IPv6Address.fromString("fe80::226:2dff:fefa:0"),
final IPv6Network network = IPv6Network.fromString("fe80::226:2dff:fefa:0/112");
System.out.println(range.equals(network)); // prints true
Note that every IPv6Network is also an IPv6AddressRange, but not all IPv6AddressRanges are valid IPv6Networks. That is why, when constructing an IPv6Network from a range in between a first address and a last address, the smallest possible IPv6Network (i.e. the one with the longest prefix length) will be constructed.


An IPv6AddressPool is like a range (extends IPv6AddressRange) of which certain subnets are "allocated" and other are "free".

final IPv6AddressPool pool = new IPv6AddressPool(IPv6Address.fromString("fe80::226:2dff:fefa:0"),
                                                 IPv6Address.fromString("fe80::226:2dff:fefa:ffff"), 120);
System.out.println(pool.isFree(IPv6Network.fromString("fe80::226:2dff:fefa:5ff/120"))); // prints true
final IPv6AddressPool newPool = pool.allocate(IPv6Network.fromString("fe80::226:2dff:fefa:5ff/120"));
System.out.println(newPool.isFree(IPv6Network.fromString("fe80::226:2dff:fefa:5ff/120"))); // prints false

This was only a short introduction. Much more can be done with these types. I invite you to have a look at the javadoc and of course the actual source code.


I decided to make all types immutable. For things like IPv6Address and IPv6Network, this obviously makes sense because they represent immutable concepts. For IPv6AddressPool, I was in doubt whether immutability was the right choice. When allocating an address in a pool, immutability means I have to return a new IPv6AddressPool instance. Internally the IPv6AddressPool maintains a SortedSet of all ranges of addresses that are still available in the pool. This SortedSet (TreeSet) thus has to be copied each time a new IPv6AddressPool is to be created. It is not a deep copy (the IPv6AddressRanges in the SortedSet are immutable themselves), but still constructing new TreeSet instances each time seems sub optimal. It would be nice to investigate if I can improve the situation using some kind of persistent data structure to replace the TreeSet. I had a quick look at pcollections, but it doesn't seem to provide an alternative to TreeSet with SortedSet semantics. Other suggestions are much appreciated!

Wednesday, March 31, 2010

Load Balancing Actors with Work Stealing Techniques

Actor concurrency is well known to provide a more convenient model to concurrency than traditional thread based approaches. In contrast with threads, you can have as many actors as you want (if memory permits) and the actor implementation will take care of processing actors on threads whenever necessary behind the scenes.

In Akka, which among other things provides actors for the JVM (in Scala), the component which schedules actors for processing on a set of threads is called the "dispatcher". The default dispatcher uses a fixed size thread pool to process actors on. When a message is sent to an actor, a task is scheduled on the thread pool to process all the messages in the mailbox of the actor.

What actor concurrency does not provide out of the box is load balancing among equal actors ("equal" meaning they are both capable of processing the same types of messages).

You can implement load balancing on top of actor frameworks (example in Akka), by sending messages to a set of actors in some clever way (round robin, always choosing the one with the smallest mailbox, ...) but this has a few limitations. This kind of load balancing is rather "static": once a message is sent to an actor, it stays there until that actor processes it. If another actor is already done, it can never help the slow actor for messages that have already been distributed.

Here comes work stealing.

Work stealing is an idea used for example in the Fork/Join framework. Applied to actors, it implies that idle actors can "steal work" from the tail of the mailbox of busy actors. Of course actors are no threads, so "idle actors" can not really do anything because they are not assigned to a thread as long as they stay idle. This is why the implementation is a bit more subtle than the conceptual idea.

I have implemented a work stealing dispatcher for Akka actors. Although its called "work stealing" the implementation actually behaves more as "work donating" because the victim actor takes the initiative. I.e. it actually donates work to its thief, rather than having the thief steal work from the victim.

There is some cleverness in the dispatcher implementation using ReentrantLock.tryLock to check if an actor which is being scheduled on the dispatcher is actually capable of processing its mailbox (another thread might already be busy). If an actor can not process messages for this reason, it will try selecting a thief (round robin), donate a message from its mailbox to the thief (a ConcurrentLinkedDeque, using Deque.pollLast) and process the message in the thief.

Using the victim thread to do the "work donating" while this thread can not be used to process the victims mailbox itself because another thread is already doing that, implies that no threads are "waisted" on the work stealing process. Furthermore, when selecting a thief, the algorithm will make sure that the thief is idle, such that no messages are being redistributed to an actor which can not process them immediately. This also prevents stolen messages from being stolen again by another actor later on.

 To use it, you simply declare to use the work stealing dispatcher in your Akka actors:

val dispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pooled-dispatcher")

class WorkstealingActor extends Actor {
  messageDispatcher = dispatcher
  def receive = {
    case test => {/* process message */}

When you create multiple instances of this actor, they'll all share the same work stealing dispatcher.

Monday, February 15, 2010

Automatically reload log4j configuration in tomcat

I was looking for a way to have Log4j reload its configuration file automatically when it changes, in Tomcat. Log4j can do this with the configureAndWatch method, but the default initialization procedure (simply putting a file in the classpath) doesn't use configureAndWatch. You have to write at least a little bit of code to get Log4j to do this. I found the easiest solution for integration with Tomcat to be to implement a Tomcat LifecycleListener.

import org.apache.catalina.Lifecycle;
import org.apache.catalina.LifecycleEvent;
import org.apache.catalina.LifecycleListener;
import org.apache.log4j.LogManager;
import org.apache.log4j.PropertyConfigurator;

public class Log4JInitializer implements LifecycleListener
    private String propertiesFile;

    public String getPropertiesFile()
        return this.propertiesFile;

    public void setPropertiesFile(String propertiesFile)
        this.propertiesFile = propertiesFile;

    public void lifecycleEvent(LifecycleEvent event)
        if (Lifecycle.BEFORE_START_EVENT.equals(event.getType()))

    private void initializeLog4j()
        // configure from file, and let log4j monitor the file for changes

        // shutdown log4j (and its monitor thread) on shutdown
        Runtime.getRuntime().addShutdownHook(new Thread()
            public void run()

I simply listen for the "BEFORE_START_EVENT", and if that happens (which is once per Tomcat startup) I initialize Log4j using the configureAndWatch method. I also don't forget to install a shutdown hook to cleanup the thread Log4j creates to poll the configuration file for changes (I could also have chosen to listen to the "AFTER_STOP_EVENT" from Tomcat in stead).

Package this in a jar, put it on the Tomcat classpath, and now you can configure it in your Tomcat serverl.xml.

  <Listener className="Log4JInitializer" propertiesFile="/path/to/"/>

Can't be much easier, and it does what it has to do.

Wednesday, January 20, 2010

Performance tuning tools for a multi core world

Recently I was performance tuning a Java application on a quad core Intel Xeon X5560. This processor has hyper threading, so it shows up in the operating system as if it were 8 cores.

My application was slower than I wanted it to be, and I was faced with this vmstat output:
procs -----------memory---------- ---swap-- -----io---- --system-- -----cpu------
 r  b   swpd   free   buff  cache   si   so    bi    bo   in   cs us sy id wa st
 3  0 1048660 209840 199608 4123568    0    0   150    24 1577 2599 13  9 77  1  0
 2  0 1048660 210220 199612 4123712    0    0   104    82 1593 2891 13 10 75  2  0
 1  1 1048660 209260 199628 4124596    0    0   484   210 1537 2567 10  7 77  6  0
 1  1 1048660 213940 199612 4120184    0    0  1232    32 1548 2694  9  7 76  8  0

At first sight, I thought it looked ok. That's between 9 and 13 % CPU time in user space, between 7 and 10 % in system space, and 75 to 77 % is idle. So it looked like my application is not using much of the CPU at all...

My application was still slower than I wanted it to be, so I went on looking for reasons why it was not using more of the CPU. I took some thread dumps and used the YourKit profiler to measure the time threads were waiting for locks. I couldn't find anything.

Then I had a look at what "top" had to say, in stead of using vmstat. It showed comparable numbers:
Cpu(s): 14.6%us, 10.5%sy,  0.0%ni, 73.2%id,  1.2%wa,  0.0%hi,  0.5%si,  0.0%st

But it says "Cpu(s)", so I hit "1" to get a view per CPU... now things started to get interesting:
Cpu0  : 12.7%us,  8.7%sy,  0.0%ni, 73.3%id,  3.3%wa,  0.0%hi,  2.0%si,  0.0%st
Cpu1  : 13.2%us,  8.3%sy,  0.0%ni, 77.5%id,  0.7%wa,  0.0%hi,  0.3%si,  0.0%st
Cpu2  : 14.3%us,  8.3%sy,  0.0%ni, 77.3%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
Cpu3  : 18.0%us,  9.7%sy,  0.0%ni, 70.7%id,  0.7%wa,  0.0%hi,  1.0%si,  0.0%st
Cpu4  : 18.0%us, 17.3%sy,  0.0%ni, 64.7%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
Cpu5  : 15.3%us, 12.0%sy,  0.0%ni, 72.7%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
Cpu6  : 11.6%us,  8.9%sy,  0.0%ni, 79.5%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
Cpu7  : 13.3%us,  9.0%sy,  0.0%ni, 77.3%id,  0.3%wa,  0.0%hi,  0.0%si,  0.0%st

Now of course, the problem was staring me in the face. The CPU times of vmstat and top (and many other tools) are relative to the total amount of CPUs. With 8 cores, this means that (for a single thread) 12.5% represents a CPU usage of 100% on only one CPU. With this in mind, the numbers of vmstat start to make sense. They mean that more or less 1 CPU is fully busy in user space, and one other CPU is almost fully busy in system space. In other words, my application would probably be faster if it did less I/O and more concurrent processing.

It also means that, with more and more CPUs, tools like vmstat are becoming less usefull. With only one CPU, you get a scale from 0 to 100. With 8 cores, the scale is reduced from 0 to 12. The granularity of the output of vmstat is 1 percentage, so with 100 cores the difference between some random "noise" on an otherwise idle machine and an idle machine except for 1 core which is fully used, will become invisible.

Its not only vmstat of course, the output of the CPU profiling in YourKit for example, has the same problem. The graph stays around 12.5%:

So to conclude:

I think we might need to update our profiling tools. We can also add new tools to our profiling toolbox. I experimented a little with mpstat for example, which gives much of the same information as vmstat, but does it per CPU.

And of course, as we all already know, we also also have to update our applications. Concurrency is not going away any time soon.