Programming

Improvement on user geolocation cache with Hazelcast

In the last post, we have seen how to cache user geolocation data in Hazelcast, and search for nearby users. This was great. However, as soon as you have a lot of cached data, you would find that searching for nearby users is quite slow. What was wrong?

As you have remembered, we have implemented a GeoDistancePredicate which computes to see if a user is within the distance limit to a specific point. The way this predicate works is that, for every entry cached in memory, Hazelcast would invoke the apply() method to see if the entry satisfies the distance limit criterion. If you have cached a large data set, this method will loop through all entries in the whole data set, one by one. Therefore, this is basically a O(n) problem. There’s no way it can fast, if you have a large data set.

We need to find a better way to quickly search for nearby users, using only the mechanisms available in Hazelcast.

Hazelcast defined two kinds of predicates, one is the normal Predicate, and the other is called IndexAwarePredicate. As the name implies, the second predicate interface will use the internal attribute indexes of your cached object to speed up query. In the previous post, our GeoDistancePredicate only implements the Predicate interface, therefore, during the query operation, it has to scan through the whole data set to get the right results. In this post, we are going to change the implementation to take into consideration indexing, which should significantly help with search performance.

Before we can use the indexes for searching, we have to tell Hazelcast which attributes to index. Remember in the last post, we had defined a class called MyCachedUser. This simple class has three attributes, but when we search for nearby users, we are only interested in the user’s location, namely their latitude and longitude coordinates. Therefore, we want to build indexes on these two attributes to help speeding up with searches.

At start up, we need to tell Hazelcast that we want to index these attributes, such as:

    HazelcastInstance hz = Hazelcast.newHazelcastInstance();
    IMap<Object, Object> map = hz.getMap("users");
    map.addIndex("latitude", true);
    map.addIndex("longitude", true);

This tells Hazelcast that we want the two attributes to be indexed, and that we will search for ranges, hence, we set the second parameter to true. This tells Hazelcast that these two attributes need to be indexed and sorted.

Now that we have the indexes in place, we can use the indexes to limit the search space to a specific range. Given a point, we want to draw a circle with the point as the center, with the distance limit as the radius of the circle. And we want to limit our search within the circle only. Before we can do that, we need to figure out how to draw the circle first, then find all users whose current location is within the circle.

However, drawing a circle would not allow us to take advantage of the latitude/longitude indexes. It is much easier to draw a square first, and limit the search ranges within the square. We know that the circle is within the square. After we have found all users within the square, we can eliminate those who are not within the circle, mainly, the users found at the four corners of the square.

To draw the square, we need a point to the north of the central point, a point to the east, a point to the south, and a point to the west, all with distance equal to the distance limit we want to search for.

For these four points, we have a starting point, the bearing, and the distance. The formula to find the point is well-known, so I’m not going into all the details. The Java implementation is followed:

	public static GeoCoordinate fromBearingDistance(double lat, double lon, double bearing, double d)
	{
		/*
		 * φ2 = asin( sin(φ1)*cos(d/R) + cos(φ1)*sin(d/R)*cos(θ) )
		 * λ2 = λ1 + atan2( sin(θ)*sin(d/R)*cos(φ1), cos(d/R)−sin(φ1)*sin(φ2))
		 */
		double lat1 = Math.toRadians(lat);
		double lon1 = Math.toRadians(lon);
		double brng = Math.toRadians(bearing);
		double lat2 = Math.asin(Math.sin(lat1) * Math.cos(d / EARTH_RADIUS)
				+ Math.cos(lat1) * Math.sin(d / EARTH_RADIUS) * Math.cos(brng));
		double lon2 = lon1
				+ Math.atan2(
						Math.sin(brng) * Math.sin(d / EARTH_RADIUS)
								* Math.cos(lat1), Math.cos(d / EARTH_RADIUS)
								- Math.sin(lat1) * Math.sin(lat2));
		return new GeoCoordinate(Math.toDegrees(lat2), Math.toDegrees(lon2));
	}

The coordinates are provided in degrees, but the formula calculates based on radians. Therefore, we need to convert to radians first, and convert the result back to degrees. And the GeoCoordinate is defined as:

public class GeoCoordinate implements Portable
{
    public static final String KEY_LATITUDE          = "latitude";
    public static final String KEY_LONGITUDE         = "longitude";
    
	private double latitude;
	private double longitude;

	public GeoCoordinate()
	{
	}
	
	public GeoCoordinate(double lat, double lng)
	{
		this.latitude = lat;
		this.longitude = lng;
	}
	
	public double getLatitude()
	{
		return latitude;
	}
	
	public void setLatitude(double lat)
	{
		this.latitude = lat;
	}
	
	public double getLongitude()
	{
		return longitude;
	}
	public void setLongitude(double lng)
	{
		this.longitude = lng;
	}
	
	@Override
	public int getFactoryId()
	{
		return CachedObjectFactory.FACTORY_ID;
	}

	@Override
	public int getClassId()
	{
		return CachedObjectFactory.TYPE_GEOCOORDINATE;
	}

	@Override
	public void writePortable(PortableWriter writer) throws IOException
	{
		writer.writeDouble(KEY_LATITUDE, latitude);
		writer.writeDouble(KEY_LONGITUDE, longitude);
	}

	@Override
	public void readPortable(PortableReader reader) throws IOException
	{
		latitude = reader.readDouble(KEY_LATITUDE);
		longitude = reader.readDouble(KEY_LONGITUDE);
	}

	@Override
	public String toString()
	{
		return "lat=" + latitude + ";lng=" + longitude;
	}
}

Now, it’s time to revisit our GeoLocationPredicate implementation. As we said earlier, we need to make this predicate aware of the indexes. We need to implement the IndexAwarePredicate interface, or we can derive it from the class AbstractPredicate, which also implements the IndexAwarePredicate interface. Without further ado, here is the modified version of GeoLocationPredicate:

public class GeoDistancePredicate extends AbstractPredicate
{
	private double latitude;
	private double longitude;
	private double distance;
	private double latFloor;
	private double latCeiling;
	private double lngFloor;
	private double lngCeiling;
	
	public GeoDistancePredicate()
	{
		super("latitude");
	}
	
	public GeoDistancePredicate(double lat, double lng, double dist)
	{
		super("latitude");
		this.latitude = lat;
		this.longitude = lng;
		this.distance = dist;

		init();
	}
	
	private void init()
	{
		GeoCoordinate c = GeoUtil.fromBearingDistance(latitude, longitude, GeoUtil.NORTH, distance);
		latCeiling = c.getLatitude();
		c = GeoUtil.fromBearingDistance(latitude, longitude, GeoUtil.SOUTH, distance);
		latFloor = c.getLatitude();
		c = GeoUtil.fromBearingDistance(latitude, longitude, GeoUtil.EAST, distance);
		lngCeiling = c.getLongitude();
		c = GeoUtil.fromBearingDistance(latitude, longitude, GeoUtil.WEST, distance);
		lngFloor = c.getLongitude();
	}
	
	@Override
	public void readData(ObjectDataInput in) throws IOException
	{
		super.readData(in);
		latitude = in.readDouble();
		longitude = in.readDouble();
		distance = in.readDouble();

		init();
	}

	@Override
	public void writeData(ObjectDataOutput out) throws IOException
	{
		super.writeData(out);
		out.writeDouble(latitude);
		out.writeDouble(longitude);
		out.writeDouble(distance);
	}

	@Override
	public boolean apply(Entry entry)
	{
		boolean res = false;
		Object obj = entry.getValue();
		if (obj instanceof MyCachedUser)
		{
			MyCachedUser u = (MyCachedUser) obj;
			double dist = GeoUtil.getDistance(latitude, longitude, u.getLatitude(), u.getLongitude());
			res = (dist <= distance);
		}
		return res;
	}

	@Override
	public Set filter(QueryContext queryContext)
	{
		String sql = "latitude BETWEEN " + latFloor + " AND " + latCeiling + " AND " + "longitude BETWEEN " + lngFloor + " AND " + lngCeiling;
		SqlPredicate sqlPred = new SqlPredicate(sql);
		Set entries = sqlPred.filter(queryContext);
		Set endList = new HashSet();
		if (logger.isDebugEnabled())
		{
			for (QueryableEntry e : entries)
			{
				Object v = e.getValue();
				if (v instanceof MyCachedUser)
				{
					MyCachedUser u = (MyCachedUser) v;
					double dist = GeoUtil.getDistance(latitude, longitude, u.getLatitude(), u.getLongitude());
					if (dist <= distance)
					{
						endList.add(e);
					}
				}
			}
		}
		return endList;
	}

}

As you can see, given a central point, we find out the points at north, at east, at south and at west. Using the coordinates of the four points to create a square, we then limit the search space within that square only. The bearings are defined as:

	public static final double NORTH = 0.0d;
	public static final double EAST = 90.0d;
	public static final double SOUTH = 180.0d;
	public static final double WEST = 270.0d;

In this new implementation, the method apply() will not be useful anymore, but the method filter() will be called to filter results based on our new search criteria. Here, we use the Between and the And predicates to search for all users within the square, then we filter out all those whose current location is not within the circle.

That’s it, there will be no change to your application logic. With this new modification, we reduce the search complexity from O(n) to O(log n), which should be significantly faster than the previous implementation.

Programming

How to cache user geolocation with Hazelcast and search for nearby users

Hazelcast is great as a distributed object cache, it provides all the infrastructure as an in-memory key/value store for caching any type of objects. It even provides some query features to query based on object attributes, albeit the query feature is quite primitive as compared to what we have in relational database. But it does the work. And if it doesn’t, you can extend it.

Let’s say you are developing some kind of social app, and you are caching user information in Hazelcast, along with user’s geolocation information. Now, you probably want to search for nearby users who are within a certain distance. How could you do a query like that in Hazelcast?

This post is going to describe how to search for nearby users in your Hazelcast cache, by implementing a new query predicate.

To keep thing simple in the example below, let’s assume that you define your CachedUser class as follow:

public class MyCachedUser implements Serializable
{
	private static final long serialVersionUID = -2253075711571121144L;
	private String id = null;
	private double latitude;
	private double longitude;
	
	public MyCachedUser()
	{
		
	}
	
	public MyCachedUser(String id, double latitude, double longitude)
	{
		this.id = id;
		this.latitude = latitude;
		this.longitude = longitude;
	}
	
	public String getId()
	{
		return id;
	}
	public void setId(String id)
	{
		this.id = id;
	}
	public double getLatitude()
	{
		return latitude;
	}
	public void setLatitude(double l)
	{
		this.latitude = l;
	}
	public double getLongitude()
	{
		return longitude;
	}
	public void setLongitude(double l)
	{
		this.longitude = l;
	}
}

That is a very simple class for caching user information. We only have an ID, and the coordinates as latitude and longitude. Obviously, instead of implementing the default Java Serializable interface, you can also implement any of the serialization mechanism as provided in Hazelcast, or using any third-party serialization framework.

Before we go further, we need to be able to calculate the distance between two points, if you have the coordinates. I am not going into details explaining how to calculate that, you can find all the math formulas and detailed explanation on Wikipedia. Here’s a simple Java implementation:

public class GeoUtil
{
	public static final double EARTH_RADIUS = 6371.0d;
	
	public static double getDistance(double lat1, double lng1, double lat2, double lng2)
	{
		double dLat = toRadian(lat2 - lat1);
		double dLng = toRadian(lng2 - lng1);

		double a = Math.pow(Math.sin(dLat / 2), 2) + Math.cos(toRadian(lat1))
				* Math.cos(toRadian(lat2)) * Math.pow(Math.sin(dLng / 2), 2);

		double c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1 - a));

		return EARTH_RADIUS * c; // returns result in kilometers
	}

	public static double toRadian(double degrees)
	{
		return (degrees * Math.PI) / 180.0d;
	}
}

Now that we have the required tools behind us, we can look at how to search for nearby users in Hazelcast. The query features in Hazelcast are not enough to make sophisticated searches. However, we can implement our own predicate to perform the searches that we want. Let’s implement a predicate, called GeoDistancePredicate, which is used to calculate distance between two points when we perform the search:

public class GeoDistancePredicate implements Predicate<String, CachedUser>, DataSerializable
{
	private double latitude;
	private double longitude;
	private double distance;
	
	public GeoDistancePredicate()
	{
	}
	
	public GeoDistancePredicate(double lat, double lng, double dist)
	{
		this.latitude = lat;
		this.longitude = lng;
		this.distance = dist;
	}
	

	@Override
	public void readData(ObjectDataInput in) throws IOException
	{
		latitude = in.readDouble();
		longitude = in.readDouble();
		distance = in.readDouble();
	}

	@Override
	public void writeData(ObjectDataOutput out) throws IOException
	{
		out.writeDouble(latitude);
		out.writeDouble(longitude);
		out.writeDouble(distance);
	}

	@Override
	public boolean apply(Entry<String, CachedUser> entry)
	{
		boolean res = false;
		CachedUser u = entry.getValue();
		double dist = GeoUtil.getDistance(latitude, longitude, u.getLatitude(), u.getLongitude());
		res = (dist <= distance);
		return res;
	}

}

This class implements the Predicate interface. We pass in the GPS coordinates and the distance limit between the two points. As you can see, the most important code is in the apply() method, which is called to compare if a user is nearby, i.e. within the defined distance.

This class also implements the DataSerializable interface, as when you perform the searches, the predicate object will be sent to where the cached data are located, which might be on a different machine. Again, you can implement any serializable interface, as long as your object can be serialized and deserialized back.

Now that we have another piece of the puzzles in place, we can query nearby users as:

	Predicate predicate = new GeoDistancePredicate(lat, lng, dist);
	Collection usersFound = cachedUserMap.values(predicate);

That’s it. Isn’t this neat? You get back a list of users whose current location is within the defined distance.

Ok, but what if we want, not only the list of users, but also the distance of each user as well? For this, we need to do extra work, as there’s no easy way to plug the requirement into the Hazelcast predicate and returned list. So we have to calculate the distance (again!) once we have the user list. Let’s create another class called DistancedCachedUser, which contain the distance value and the cached user information:

public class DistancedCachedUser implements Comparable
{
	private CachedUser user;
	private double distance;
	
	public DistancedCachedUser(CachedUser user, double distance)
	{
		this.user = user;
		this.distance = distance;
	}
	
	public double getDistance()
	{
		return distance;
	}
	
	public CachedUser getUser()
	{
		return user;
	}

	@Override
	public int compareTo(DistancedCachedUser other)
	{
		if (this.distance < other.distance)
			return -1;
		else if (this.distance == other.distance)
			return 0;
		else
			return 1;
	}
}

This class also implements the Comparable interface, as we might want to sort the returned user list from the closest to the farthest.

Finally, here is the method to search for nearby users, and returned a list with the distance value for each user:

	public Collection getNearbyUsers(double lat, double lng, double dist)
	{
		ArrayList list = new ArrayList();
		Predicate predicate = new GeoDistancePredicate(lat, lng, dist);

		Collection usersFound = cachedUserMap.values(predicate);
		for (CachedUser u : usersFound)
		{
			DistancedCachedUser cu = new DistancedCachedUser(u, GeoUtil.getDistance(lat, lng, u.latitude, u.longitude));
			list.add(cu);
		}
		Collections.sort(list);
		return list;
	}

That’s all. The last step was not as elegant and efficient as we might want, but it does the job.

Try it out, you can do a lot more by implementing your own predicates to perform the searches.

This is great. However, as soon as you have a lot of cached data, you would find that searching for nearby users is quite slow. What happened? In a next post, I’ll show how to improve on this.

Programming

How to implement off-heap memory storage for Hazelcast

In the last post, I did a small comparison of on-heap and off-heap memory storage of Hazelcast for byte arrays and plain Java objects. The benchmark methodology was far from being scientific, as it was meant for my own understanding of the Hazelcast internals and the effectiveness of my own implementation of off-heap memory management mechanism.

Hazelcast is a very elegant framework, and with an off-heap storage to get around the Java GC cycles, it is an excellent framework for distributed big data caching.

In this post, I’m going to provide some instructions on how to create your own off-heap storage implementation for Hazelcast. It is assumed here that you are familiar with Java programming, as well as the concept of memory management overall. If you have forgotten what you have learned from your CS classes, you might want to dust out some text books now. It is also assumed that you are familiar with how to allocate off-heap memory in Java. There are already a lot of information and instructions regarding this domain on the internet, and this is beyond the scope of this post anyway.

This post is not intended to provide you a fully implemented off-heap storage, it is meant to help you get familiar with the necessary preparation work, should you need to implement your own version. However, if you have the budget, getting the Hazelcast enterprise license is probably a safer route to take.

Let’s remember what I have described in the last post, regarding the Hazelcast internal logic. Hazelcast provides a set of built-in data structures, such as map, queue, list, etc. When we are dealing with these data structures, such as a map, which is a key/value store, the logic of the implementation is managed by Hazelcast. And the management of the implementation logic includes the management of the key set, eviction policy, etc. The off-heap storage, when you are using off-heap storage, is used only for storing data contents. Therefore, for the key/value pair, the key part is managed in heap memory, while the value part is stored in off-heap memory. This clean distinction between the data structure logic from the data storage allows you to swap the storage implementation easily.

Hazelcast is meant to run as a cluster. At start up, each node in the cluster needs to initialize its own environment, including its memory storage model, before joining the cluster. And this is where we can plug in our own initialization code to swap in our own off-heap storage.

First of all, you need to create a node initializer class that implements the com.hazelcast.instance.NodeInitializer interface:

public interface NodeInitializer {

    void beforeInitialize(Node node);

    void printNodeInfo(Node node);

    void afterInitialize(Node node);

    SecurityContext getSecurityContext();

    Storage getOffHeapStorage();

    void destroy();
}

The three important methods to implement are void beforeInitialize(Node node), void afterInitialize(Node node) and Storage getOffHeapStorage(). These are our entry points to plug in our own implementation. The method beforeInitialize() is where you can initialize your off-heap memory (and probably a bunch of other preparation works), and the method getOffHeapStorage() is when Hazelcast wants to get a handle of your off-heap memory for reading/writing data.

The open source version of Hazelcast has a DefaultNodeInitializer already, so it is probably better to just extend it and override only the necessary methods, such as:

public class RenzhiNodeInitializer extends DefaultNodeInitializer
{
	@Override
	public void beforeInitialize(Node node)
	{
		// Implementation code here
	}

	@Override
	public Storage getOffHeapStorage()
	{
		// Implementation code here
	}
}

One last thing before we go further is that you need to create a file

/META-INF/services/NodeInitializer

and put your own node initializer class name in that file, such as:

ca.renzhi.hazelcast.RenzhiNodeInitializer

Ok, so that gets us a foot in the door. Next thing to do is to get familiar with some of the configuration parameters when we use off-heap memory in Hazelcast. From the source code in my last post, we can see the following configuration:

Config cfg = new Config();
cfg.setProperty("hazelcast.elastic.memory.enabled", "true");
cfg.setProperty("hazelcast.elastic.memory.total.size", "10G");
cfg.setProperty("hazelcast.elastic.memory.chunk.size", "1K");
cfg.setProperty("hazelcast.elastic.memory.unsafe.enabled", "true");

MapConfig mapConfig = new MapConfig();
mapConfig.setInMemoryFormat(InMemoryFormat.OFFHEAP);
mapConfig.setBackupCount(0);
mapConfig.setEvictionPolicy(EvictionPolicy.LRU);
mapConfig.setEvictionPercentage(2);
MaxSizeConfig mxSizeCfg = new MaxSizeConfig(Constants.MAX_SIZE, MaxSizeConfig.MaxSizePolicy.PER_NODE);
mapConfig.setMaxSizeConfig(mxSizeCfg);
mapConfig.setName("TestCache");

cfg.addMapConfig(mapConfig);
HazelcastInstance hz = Hazelcast.newHazelcastInstance(cfg);
Map&lt;String, Customer&gt; map = hz.getMap("TestCache");

There are quite a few configuration parameters here, but the most important to understand are hazelcast.elastic.memory.total.size, hazelcast.elastic.memory.chunk.size and InMemoryFormat.OFFHEAP.

By just enabling “elastic memory” model does not mean that Hazelcast will use off-heap memory, you really need to set InMemoryFormat.OFFHEAP to tell Hazelcast that we really want to do that.

Of the other two parameters, only the total size parameter is actually important. This is the size of memory that we want to allocate during the initialization phase at startup. Remember the method beforeInitialize()? That’s the place where we want to initialize the memory.

The chunk size parameter is useful to the extent of your own off-heap memory implementation. Depending on how you are going to implement it, you probably don’t even need this parameter. But this parameter gives you a hint on how Hazelcast assumes you are going to implement it. Now, this is where your knowledge of memory management, algorithms etc will come into play. You might want to brush up on these concepts from your CS text books.

In computer science text books, system memory is presented as a big, continuous array. But internally, it is divided into small segments of equal size, called pages. Hazelcast uses the same concept. At startup, you allocate a big portion of system memory, then you divide it into small segments of equal size, called chunks. That’s why the Hazelcast documentation insists on the fact that total memory size must be a multiple of the chunk size.

Whether it’s called page or chunk, they are the same concept. The basic idea is to divide the memory into small slots where you can write your data. Depending on the chunk size you set, your object might, or might not, fit into one slot. If your object is smaller than the chunk size, you waste the memory portion that is not filled with data in that chunk. If the object is larger than the chunk size, then it has to be cut into pieces that fit into the chunks, and will take more than one chunk to store the data.

So, the idea is to divide your memory into small chunks, then write your data into these chunk slots. An object might be written into more than one slot, which might not be adjacent to each other. So when you read back the object, you need to figure out where to read from, in what order, and re-assemble the chunks into the original object.

There you have it. That’s the basic idea behind the implementation of an off-heap storage for Hazelcast. How you implement it is up to you now.

Now, let’s look at what you need to create your off-heap storage class. From the signature of the method getOffHeapStorage(), you are expected to return an object of type Storage, which is defined as:

public interface Storage {

    REF put(int hash, Data data);

    Data get(int hash, REF ref);

    void remove(int hash, REF ref);

    void destroy();
}

Therefore, the off-heap storage class must implement this interface, such as:

class RenzhiOffHeapStorage implements Storage
{
    // Implementation code
}

The off-heap storage is, basically, a key/value store. The put method will be called when an object is written into the storage. And the get method is called when an object is read from the storage. The definition of the Storage interface is straightforward enough that we don’t really need explanation, especially after we have explained the concept of memory management.

Now, putting all these together, the parameter InMemoryFormat.OFFHEAP tells Hazelcast that we really want to use off-heap memory for storage, and it will call the method getOffHeapStorage() to get the handle of your off-heap memory. At this time, your NodeInitializer has already done all the preparation work, and has already plugged in your own implementation.

That’s it, you should be ready to implement your own version now. But as I said before, if you have the budget, you probably would want to get the commercial license, with technical support. However, if you need some special, custom implementation, you can contact me. For a small fee, I can handle the work ;)

Programming

Unscientific comparison of Hazelcast memory model: on-heap and off-heap

Recently, I was working on a project that requires to cache a lot of data for fast access, including images and thumbnails, as well as pure Java objects. Therefore, we set out to search for a system that can handle the workload, in a distributed manner. As the project is developed using Java, a Java solution is prefered.

One of the options we looked at is Hazelcast. Hazelcast labels itself as the in-memory data grid. Whatever that means, it means an in-memory cache pool to us. What is nice about Hazelcast is that it does not have any external dependency (besides Log4J), and clustering feature works nicely out of the box. The API is extremely simple. It took us a total of five minutes to get a cluster of three machines up for testing.

Anyone who had worked with large data set with Java will run into the Java garbage collection problem sooner or later, in which the Java virtual machine will stop the world to clean up and recollect some unused memory. When this happens, it is extremely annoying. Each GC cycle could take tens of seconds, or in some cases, as when we worked with Cassandra, it even took up to two minutes. To get around this problem, vendors are offering solutions with different names, be it big memory, elastic memory, or what not, but the end result is to make use of off-heap memory to minimize the impact of GC cycle on the system.

Hazelcast, the company that develops and supports Hazelcast, the open source framework, also has a commercial offering for off-heap memory storage, called elastic memory storage, in their enterprise version. As the project did not have a big budget, yet we still need a solution, I set out to put together an off-heap storage implementation for the open source version of Hazelcast.

In this post, I’ll show some test results, comparing the heap storage model and off-heap storage model of Hazelcast. Benchmark is a bitch, when you try to get a fair comparison of apple to apple. At first, it looks like a simple comparison. You just swap in a new memory management model, and that should be it. But when you get down to the details, comparison becomes quite tricky.

In this test, I tried to compare caching of data in the form of byte array (e.g. images), and Java objects, both for on-heap and off-heap memory storage.

Note that this benchmark is going to be very unscientific. This is just for me to get a rough idea of the performance of my own implementation of off-heap storage. Therefore, you should take any data presented in this test result with a big grain of salt.

Let’s get down to the Java source code first. Here is the main class to start up the test:

package ca.renzhi.hazelcast.test;

public class HazelcastMemTest
{
	public static void printHelp()
	{
		System.out.println("");
		System.out.println("ca.renzhi.hazelcast.test.HazelcastMemTest count [maxMem]");
		System.out.println("");
	}

	public static void testObjectCache(String[] args)
	{
		ObjectCacheTest t = null;
		int count = 0; 
		int maxMem = 0;
		if (args.length == 2)
		{
			count = Integer.parseInt(args[0]);
			maxMem = Integer.parseInt(args[1]);
			t = new ObjectCacheTest(maxMem, count);
		}
		else
		{
			count = Integer.parseInt(args[0]);
			t = new ObjectCacheTest(count);
		}

		t.runTest();
		t.shutdown();		
	}

	public static void testByteArrayCache(String[] args)
	{
		ByteArrayCacheTest t = null;
		int count = 0; 
		int maxMem = 0;
		if (args.length == 2)
		{
			count = Integer.parseInt(args[0]);
			maxMem = Integer.parseInt(args[1]);
			t = new ByteArrayCacheTest(maxMem, count);
		}
		else
		{
			count = Integer.parseInt(args[0]);
			t = new ByteArrayCacheTest(count);
		}

		t.runTest();
		t.shutdown();		
	}
	public static void main(String[] args)
	{
		if (args.length < 1)
		{
			printHelp();
			System.exit(0);
		}

		//testObjectCache(args);
		testByteArrayCache(args);
	}

}

The main test driver is pretty simple, it takes a number of objects to generate for testing from the command line. And for testing off-heap memory, a second argument for the size of off-heap memory is provided.

And now, the main test for byte array data cache:

package ca.renzhi.hazelcast.test;

import java.util.Iterator;
import java.util.Map;
import java.util.Random;

import com.hazelcast.config.Config;
import com.hazelcast.config.InMemoryFormat;
import com.hazelcast.config.MapConfig;
import com.hazelcast.config.MaxSizeConfig;
import com.hazelcast.config.MapConfig.EvictionPolicy;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;

public class ByteArrayCacheTest
{
	private Random rand = new Random();
	private int count = 0;
	private int maxMem = 0;
	private HazelcastInstance hz = null;
	private Map&lt;String, byte[]&gt; map = null;

	public ByteArrayCacheTest(int maxMem, int count)
	{
		this.maxMem = maxMem;
		this.count = count;

		System.out.println("Unsafe mode");

		Config cfg = new Config();
		cfg.setProperty("hazelcast.elastic.memory.enabled", "true");
		cfg.setProperty("hazelcast.elastic.memory.total.size", this.maxMem + "G");
		cfg.setProperty("hazelcast.elastic.memory.chunk.size", "1K"); // Default is 1K
		cfg.setProperty("hazelcast.elastic.memory.unsafe.enabled", "true");

		MapConfig mapConfig = new MapConfig();
		mapConfig.setInMemoryFormat(InMemoryFormat.OFFHEAP);
		mapConfig.setBackupCount(0);
		mapConfig.setEvictionPolicy(EvictionPolicy.LRU);
		mapConfig.setEvictionPercentage(2);
		MaxSizeConfig mxSizeCfg = new MaxSizeConfig(Constants.MAX_SIZE, MaxSizeConfig.MaxSizePolicy.PER_NODE);
		mapConfig.setMaxSizeConfig(mxSizeCfg);
		mapConfig.setName("TestCache");

		cfg.addMapConfig(mapConfig);

		hz = Hazelcast.newHazelcastInstance(cfg);
		map = hz.getMap("TestCache");
	}

	public ByteArrayCacheTest(int count)
	{
		this.count = count;

		System.out.println("Heap mode");

		Config cfg = new Config();
		MapConfig mapConfig = new MapConfig();
		mapConfig.setBackupCount(0);
		mapConfig.setEvictionPolicy(EvictionPolicy.LRU);
		mapConfig.setEvictionPercentage(2);
		MaxSizeConfig mxSizeCfg = new MaxSizeConfig(Constants.MAX_SIZE, MaxSizeConfig.MaxSizePolicy.PER_NODE);
		mapConfig.setMaxSizeConfig(mxSizeCfg);
		mapConfig.setName("TestCache");

		cfg.addMapConfig(mapConfig);

		hz = Hazelcast.newHazelcastInstance(cfg);
		map = hz.getMap("TestCache");
	}

	private long initDataForHeap()
	{
		long start = System.currentTimeMillis();
		for (int i = 0; i < count; i++)
		{
			byte[] ba = new byte[10240];
			rand.nextBytes(ba);
			map.put(Integer.toString(i), ba);
		}
		long insertDuration = System.currentTimeMillis() - start;
		return insertDuration;
	}

	private long initDataForUnsafe()
	{
		long start = System.currentTimeMillis();
		byte[] ba = new byte[10240];
		for (int i = 0; < count; i++) 		
		{ 			
			rand.nextBytes(ba); 
			map.put(Integer.toString(i), ba); 
		} 		
		long insertDuration = System.currentTimeMillis() - start;
 		return insertDuration;		 
	}
 	public void runTest() 	
 	{
 		System.out.println("Running test....");
 		long insertDuration;
 		if (maxMem > 0)
			insertDuration = initDataForUnsafe();
		else
			insertDuration = initDataForHeap();
		int size = map.size();
		long start = System.currentTimeMillis();
		Iterator it = map.keySet().iterator();
		while (it.hasNext())
		{
			String key = it.next();
			byte[] ba = map.get(key);
		}
		long readDuration = System.currentTimeMillis() - start;

		System.out.println("count=" + count + "; size=" + size + "; write=" + insertDuration + "; read=" + readDuration);
	}

	public void shutdown()
	{
		hz.shutdown();
	}

}

The code is very simple. It initializes a hash map for on-heap or off-heap memory storage, depending on the case, then generate byte arrays of 10KB each, fill it with random data, and store it in the hash map. Then it tries to read them back. During inserting and retrieval, we just log how much time it takes to perform the operation.

The max size of the map is set to Constants.MAX_SIZE, which is 5 millions entries.

At first look, it seems that the off-heap test code is unfair, as the on-heap test code will allocate byte array memory for each object, while the off-heap test code just re-uses the same memory buffer. This is actually not really the case, as for on-heap storage, you will need to allocate memory for the byte array once, and rely on the cleverness of the Java VM to manage the memory properly. But in the off-heap case, the data needs to be copied from the on-heap memory buffer to the off-heap memory. Therefore, in real production code, you would have to do the same thing anyway. And in production code, with some little trick, you can pre-allocate, and re-use it every time, a memory buffer to store the intermediate data before putting it into the off-heap storage.

And now, the main test for Java object cache:

package ca.renzhi.hazelcast.test;

import java.util.Iterator;
import java.util.Map;

import com.hazelcast.config.Config;
import com.hazelcast.config.InMemoryFormat;
import com.hazelcast.config.MapConfig;
import com.hazelcast.config.MaxSizeConfig;
import com.hazelcast.config.MapConfig.EvictionPolicy;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;

public class ObjectCacheTest
{
	private int count = 0;
	private int maxMem = 0;
	private HazelcastInstance hz = null;
	private Map<String, Customer> map = null;

	public ObjectCacheTest(int maxMem, int count)
	{
		this.maxMem = maxMem;
		this.count = count;

		System.out.println("Unsafe mode");

		Config cfg = new Config();
		cfg.setProperty("hazelcast.elastic.memory.enabled", "true");
		cfg.setProperty("hazelcast.elastic.memory.total.size", this.maxMem + "G");
		cfg.setProperty("hazelcast.elastic.memory.chunk.size", "1K"); // Default is 1K
		cfg.setProperty("hazelcast.elastic.memory.unsafe.enabled", "true");

		MapConfig mapConfig = new MapConfig();
		mapConfig.setInMemoryFormat(InMemoryFormat.OFFHEAP);
		mapConfig.setBackupCount(0);
		mapConfig.setEvictionPolicy(EvictionPolicy.LRU);
		mapConfig.setEvictionPercentage(2);
		MaxSizeConfig mxSizeCfg = new MaxSizeConfig(Constants.MAX_SIZE, MaxSizeConfig.MaxSizePolicy.PER_NODE);
		mapConfig.setMaxSizeConfig(mxSizeCfg);
		mapConfig.setName("TestCache");

		cfg.addMapConfig(mapConfig);

		hz = Hazelcast.newHazelcastInstance(cfg);
		map = hz.getMap("TestCache");
	}

	public ObjectCacheTest(int count)
	{
		this.count = count;

		System.out.println("Heap mode");

		Config cfg = new Config();
		MapConfig mapConfig = new MapConfig();
		mapConfig.setBackupCount(0);
		mapConfig.setEvictionPolicy(EvictionPolicy.LRU);
		mapConfig.setEvictionPercentage(2);
		MaxSizeConfig mxSizeCfg = new MaxSizeConfig(Constants.MAX_SIZE, MaxSizeConfig.MaxSizePolicy.PER_NODE);
		mapConfig.setMaxSizeConfig(mxSizeCfg);
		mapConfig.setName("TestCache");

		cfg.addMapConfig(mapConfig);

		hz = Hazelcast.newHazelcastInstance(cfg);
		map = hz.getMap("TestCache");
	}

	public void runTest()
	{
		System.out.println("Running test....");
		long start = System.currentTimeMillis();
		for (int i = 0; i < count; i++)
		{
			String name = "Customer " + i;
			String addr = i + " Park Avenue, #" + i;
			String phone = "123456789";
			Customer cust = new Customer(i, name, addr, phone);
			map.put(Integer.toString(i), cust);
		}
		long insertDuration = System.currentTimeMillis() - start;
		int size = map.size();
		start = System.currentTimeMillis();
		Iterator it = map.keySet().iterator();
		while (it.hasNext())
		{
			String key = it.next();
			Customer cust = map.get(key);
		}
		long readDuration = System.currentTimeMillis() - start;

		System.out.println("count=" + count + "; size=" + size + "; write=" + insertDuration + "; read=" + readDuration);
	}

	public void shutdown()
	{
		hz.shutdown();
	}

}

This test code is very similar to the byte array cache test code, the difference is to store a Java object in the hash map instead of byte array. And here is the simple Java class used for testing.

package ca.renzhi.hazelcast.test;

import java.io.Serializable;

public class Customer implements Serializable
{
	private static final long serialVersionUID = -665013352562905224L;
	private int id;
	private String name;
	private String address;
	private String phone;

	public Customer(int id, String name, String addr, String phone)
	{
		this.id = id;
		this.name = name;
		this.address = addr;
		this.phone = phone;
	}

	public int getId()
	{
		return id;
	}

	public String getName()
	{
		return name;
	}

	public String getAddress()
	{
		return address;
	}

	public String getPhone()
	{
		return phone;
	}
}

A little bit more patience, and we will see the test results. Here is a last piece of bash script to start up the test:

#!/bin/bash

function show_help {
    echo ""
    echo "test1.sh count mode"
    echo ""
    echo "where"
    echo "   count         Integer, number of objects to generate"
    echo "   mode          Memory mode, unsafe or heap"
    echo ""
}

if [ $# -ne 2 ]
then
	show_help
	exit 0 
fi

count=$1
mode=$2

curr_dir=`pwd`
CLASSPATH=$CLASSPATH:$curr_dir/libs/hazelcast/hazelcast-3.2.jar:$curr_dir/libs/log4j/log4j-core-2.0-beta9.jar:$curr_dir/libs/log4j/log4j-api-2.0-beta9.jar:$curr_dir/bin:

if [ $mode == "unsafe" ]
then
	java -Xms256m -Xmx4096m -XX:MaxPermSize=256m -XX:MaxDirectMemorySize=10G -cp $CLASSPATH ca.renzhi.hazelcast.test.HazelcastMemTest $count 10
else
	java -Xms256m -Xmx14336m -XX:MaxPermSize=256m -cp $CLASSPATH ca.renzhi.hazelcast.test.HazelcastMemTest $count
fi

This script needs a bit of explanation. For the test of different memory models, we try to allocate the same amount of memory for both cases. For off-heap storage, the memory is divided into heap memory, which is managed by the JVM, and the direct access off-heap memory, which is managed by my implementation of the “elastic memory” storage.

You might be asking, since you are managing your own off-heap memory storage already, why do you still allocate so much memory for Java heap? Why don’t you just put more into the off-heap pool?

Well, first of all, when we generate test data, there will be quite a bit of intermediate Java objects that live in the heap memory, before we get a chance to put them into the off-heap storage. Same when we read them back.

Secondly, you really need to understand the data structure management model inside Hazelcast. Hazelcast provides a set of built-in data structures, such as map, distributed map, queue, list, etc. When we are dealing with these data structures, such as a map, which is a key/value store, the logic of the implementation is managed by Hazelcast. And the management of the implementation logic includes the management of the key set, eviction policy, etc. The off-heap storage is used only for storing data contents. Therefore, for the key/value pair, the key part is managed in heap memory, while the value part is stored in off-heap memory. If you have a lot of entries, you need to allocate enough heap memory for JVM to handle to workload.

Therefore, depending on your use case, even if you use off-heap storage, such as the enterprise version of Hazelcast, you still need to figure out how much heap memory you need to allocate for your application.

Ok, enough of rambling, let’s get to the test results. For byte array caching, I run tests for 100K, 250K, 500K, 750K, 800K and one million entries. And here are the results for off-heap storage model:

count=100000; size=100000; write=9600; read=2733
count=250000; size=250000; write=21497; read=6527
count=500000; size=500000; write=43982; read=12600
count=750000; size=750000; write=64904; read=19569
count=800000; size=800000; write=70220; read=20088
count=1000000; size=1000000; write=89807; read=25993

The first column is the count value provided on the command line, the second column is the size of the map after we entered the test data. The third column is the time to write that amount of test data into map, and the fourth column is the time to read them all back. The unit of both times is in milliseconds.

And now, let’s see the results for on-heap storage model:

count=100000; size=100000; write=8281; read=2459
count=250000; size=250000; write=22448; read=6296
count=500000; size=500000; write=47521; read=10346
count=750000; size=750000; write=64818; read=16513
count=800000; size=800000; write=77322; read=25430

From the results above, we can see that when the number of entries is small, there’s no advantage in using off-heap storage. The advantage starts to show when we reach 500K entries. And for on-heap storage model, we didn’t get a chance to complete the test for one million entries before we ran into this error:

Running test....
Apr 22, 2014 2:59:05 PM com.hazelcast.partition.InternalPartitionService
INFO: [192.168.122.1]:5701 [dev] [3.2] Initializing cluster partition table first arrangement...
Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
	at ca.renzhi.hazelcast.test.ByteArrayCacheTest.initDataForHeap(ByteArrayCacheTest.java:84)
	at ca.renzhi.hazelcast.test.ByteArrayCacheTest.runTest(ByteArrayCacheTest.java:112)
	at ca.renzhi.hazelcast.test.HazelcastMemTest.testByteArrayCache(HazelcastMemTest.java:58)
	at ca.renzhi.hazelcast.test.HazelcastMemTest.main(HazelcastMemTest.java:70)
Exception in thread "hz._hzInstance_1_dev.HealthMonitor" java.lang.OutOfMemoryError: Java heap space

Looking at the trend, we can, kind of, extrapolate what it would like when the number of entries get even larger, and when JVM hits worse GC cycles. I don’t have machines with 64GB of RAM to test with, but obviously in this case, it seems that off-heap storage can manage memory usage better for large number of entries.

Before we go further, let’s look at the test results for caching Java objects now. For Java objects caching, I ran tests for 100K, 500K, 1 million, 2 million, and 5 million entries, since each Java object is much smaller than the 10KB data we used in previous test. The following results are for off-heap storage:

count=100000; size=100000; write=4336; read=4475
count=500000; size=500000; write=16060; read=19291
count=1000000; size=1000000; write=29587; read=36663
count=2000000; size=2000000; write=57830; read=73778
count=5000000; size=4777740; write=147139; read=178364

Ok, what’s wrong with last line? We set to create 5 million entries, and after we entered 5 million entries into the map, the map size showed only 4,777,740 entries. Well, in the code above, we have set the max size for the hash map to be 5 million entries, and when we hit the limit, the eviction policy kicked in. Using the LRU (least recently used) algorithm, some entries (2% as set in our code) have been evicted. Nevertheless, I was a bit surprised how early the eviction process kicked in.

Therefore, the write time in the last line is for writing 5 million entries, and the read time is for reading only 4,777,740 entries.

In the test above, there’s another thing that we want to plan carefully, when we use off-heap memory for caching Java objects. We set the chunk size to 1KB in our configuration, but the Java objects that we put in the cache is significantly smaller than 1KB. Therefore, in this case, we have wasted quite a bit of memory. We can certainly set the chunk size to a smaller value, but in the cost of more management overhead.

And here are the results for on-heap storage:

count=100000; size=100000; write=4084; read=3998
count=500000; size=500000; write=14763; read=19074
count=1000000; size=1000000; write=29877; read=36636
count=2000000; size=2000000; write=59254; read=73251
count=5000000; size=4712401; write=141923; read=167874

Ah, as we can see, storing Java objects in off-heap memory is slower. Well, this is, somewhat, not a surprise, as the Java object must be serialized before storing into off-heap memory, and must be deserialized when it is read back. We used the default serialization mechanism provided by Java in this test, which might not be the most efficient way. It would be interesting to compare with other serialization mechanisms.

It would be also interesting to see how Hazelcast behaves when we reach tens of millions, or even hundreds of millions, of entries. Again, I don’t have the hardware necessary to perform the test.

Like I said from the beginning, this test is far from scientific. I did the test to get an idea of how well my own implementation of off-heap storage for Hazelcast performs. The implementation in the Hazelcast enterprise version might have a more efficient algorithm, and the test results might be completely different. Besides, the enterprise version also comes with a very nice management console (ok, that is just my impression from the published documentation and screenshots, I have no first-hand experience).

Another lesson learned is, using off-heap storage is better for storing objects that are larger than 1KB, or whatever chunk size you decide to use. Obviously, you need to find a balance between management overhead and memory wasting.

At the end, Hazelcast is a very nice framework to use in your project. Simple, elegant.

Soliloquy

WordPress ate my blog

I upgraded WordPress from time to time, but not very frequently. Last time, when I saw a reminder that there was a new version 3.9 available, I was tempted. Without thinking too much, I already clicked on the upgrade button. That was one big mistake. The upgrade hanged in the middle of the process, and never returned. The site was gone. After struggling for a few days trying to manually repair it, I gave up and started from scratch again.

So that was it. That sucks!