A Data Engineering Design Pattern and Trial of HBase and Redis

Key-value stores can be a useful piece of your software stack, but choosing the right one for your use cases can be hard. Moreover, as open-source offerings evolve, it’s nice to be able to do quick performance analysis on how a new offering can improve your software stack. Today, I’ll be sharing my experience with two different key-value store offerings with different strengths and weaknesses: HBase and Redis.

The use case I’ll focus on is covered in my earlier blog posting about my Insight Data Engineering project. In a nutshell, I created a simple REST API that exposes real estate listing data I gathered from Trulia’s API. One can query a geo-graphic location (state, county, city, or zipcode) for a date range, and they are given average listing price for that location and 10 nearby locations.

If you are new to Redis, it is an in-memory key-value store that can store several different data structures as values. Redis has a cluster-based approach. It’s unclear if it is stable because it has been in alpha or unstable for over 2 years, but I would be anxious to give it a try particularly based on what follows. If you are new to HBase, it is a NoSQL database that runs on top of HDFS, which means it can scale with a Hadoop installation. Additionally, by default HBase uses a least recently used block caching for storing blocks of data in memory at the region servers (typically co-located with HDFS data nodes)

Why am I comparing these two quite different technologies? For my particular use case, I didn’t have that much data in HBase. Maybe I could just store it all in memory using something simpler standing up a Hadoop cluster. How can I use a good software design pattern so that I can accommodate switch underlying technologies?

To avoid being tied to a particular technology, it is a good practice to abstract different functionalities in your codebase. This is way easier said than done. Let me share with you how my project, Theft-Market, evolved. First, I had separate functions for fetching data from Trulia, parsing the XML response, and writing to a datastore. But these were all in the same class and their functionalities intertwined (i.e., the parsing output was a dictionary that was matching my HBase schema). Then, I moved the parsing functions into Cython for a 2x improvement there. Finally, I re-contracted the parser to datastore writer interface to accept a more schema-agnostic dictionary for putting into a datastore. Also, I defined the same functions in my HBase Manager and Redis Manager classes. This last changed enabled me to change what self.kv_store_manager object was in the Trulia data fetcher for writing data and in the Flask web server for reads. The next minor step is to move this to configuration! To go a step further with this concept of abstraction, it could make sense to make abstractions across internal REST APIs, so you can separate functionalities from a particular language and separate computing hardware.

The performance comparison I’ll present is more qualitative and anecdotal than precise and scientific, but exemplifies the stark underlying differences in Redis and HBase. First the setup, I’m using a 4-node cluster of 3 x m1-medium and 1 x m1-large with cloudera manager version 5.1.0, and I’m using the Thrift API with HappyBase for accessing HBase with Python. The standard Redis standalone install: apt-get install redis-server and pip install redis for the client client. I’m using the m1-large with 8 GB of memory for running the standalone Redis store. See the HBase/Redis schema part of the Theft-Market Schema page for the key and value structure.

Webserver Pipeline

First, the performance of HBase for getting 10 rows that were nearby each other (based on good row-key design) took around 100 ms. I noticed that when I was running puts to HBase (while getting additional data), the response time would triple but never stall for seconds. One issue that I continue to find is that the HBase Thrift server, which is what HappyBase connects to, is fragile and needs strong type checking prior to input. I also increased the Java heap size, which seemed to help a bit. Occasionally, it needs restarting anyways, so this is definitely something to be aware of. I may try the native Java API next time.

Like I mentioned earlier, the total data footprint listed was not huge in 4 GB (when doing hdfs dfs -du -h /hbase), so I thought I could put all this into memory with an in-memory datastore solution! I used an m1-large (without Hadoop installed) for testing Redis. When I put in a small fraction of my dataset around 1/100 – 1/1000 of my dataset, I was getting really fast access times, around 20 ms! I noticed that things seem to be really responsive with more and more of the dataset in Redis, except when I reached 80% of my 8 GB memory used. First, I was surprised that I was up to 6 GB of memory usage with 40% more data yet to be input. I read that Redis can be quite fragmented causing up to 10x more memory usage. Next time I try this out, I’ll check the memory fragmentation ratio available from the INFO command from the redis-cli. I’m not sure why I would have so much fragmentation when I am not overwriting any values associated with a key. Second, I was more surprised that my lookups were really slow, on the order of 5 seconds for that same 10 row lookup as before. The performance was poor before and after I stopped adding more to Redis. [EDIT: see a follow-up post on how to use an advanced Redis feature to meet my use case]

Overall, I was disappointed that I could not use Redis for my use case [EDIT: see the follow-up post], but I was excited that I had a better understanding of when to use it and a stronger argument for Hadoop on non-gigantic datasets. As an aside, I do feel Hadoop is useful with 1 TB or less of data more often than one would think particularly if there is the potential for more data to become part of a future use case. Lastly, I was also really excited about using the simple practice of abstraction in the process of this investigation. It made my software more extensible for future tools, like when I try out things like Redis cluster!

Cython Performance Case Study

I’ve been learning different programming languages since the early 2000s. It can be tempting to clutch onto a language and use it no matter what. There are advantages to mastering a single language on a personal level and company level. However, the disadvantages include:

(1) not using new languages and their inherent strengths,

(2) reliant on language support for new tools,

(3) your build and testing infrastructure are stuck in a language (I really like Makefiles because they work in all languages!),

(4) using multiple languages may guide your implementation to have clearly defined modules, and

(5) not learning new languages can limit us as programmers (see Nathan Marz’s blog that suggests learning new languages makes us better programmers).

I started programming with C and C++, but higher-level languages like Python are attractive in terms of code-density and readability, which in my experience greatly help maintainability. Many Java and C++ software engineers dismiss Python as a production-grade language because it is much slower and it is not strongly typed (Often times they dismiss each other as well :-D, but that’s another topic.).

I wanted to do a short case study on sorting unique numbers in linear time using bit vectors in each of these languages. I wanted to see how much slower Python would be compared to the strongly typed languages for bit manipulation, and as the title eludes, compare all these to Cython.

The precise question, sort a file of unique numbers and write out the numbers to a file in sorted order. I plan to use a bit vector of integer type, with the idea of setting a bit at the memory address corresponding to the number’s value. Once I have all the numbers represented with one bit, I can loop through each bit of the bit vector and print out the numbers in order.

For me it was natural to think about implementing this algorithm in C. Here is a link to the full 32-bit based implementation. The important snippets below:

// bit vector
uint32_t *bvp = (uint32_t*)malloc(bv_size*sizeof(uint32_t));

// clear bit vector
for (idx = 0; idx < bv_size; idx++) {
  bvp[idx] = 0;
}

...

// ---- reads file and sets bits in bit vector ----
while (fscanf(ifp, "%d", ph_num) != EOF) {

  idx = *ph_num/32;    // calc which int in bv to set
  bit = *ph_num % 32;    // calc which bit in int to set

  comp_int = 1;
  comp_int<<=bit;
  *(bvp + idx) = (*(bvp + idx) | comp_int); // set bit
}

// ---- outputs sorted numbers in file ----
for (idx = 0; idx < bv_size; idx++) {

  comp_int = 1;
  if (*(bvp + idx) > 0) { // small optimization
    for (bit = 0; bit < 32; bit++) {
      if (comp_int & *(bvp + idx))
        fprintf(ofp,"%d\n",idx*32+bit);
      comp_int<<=1;
    }
  } 
}

Then, I wrote this in Python. I had never cared deeply about an integer’s bit length before in Python, so my first attempt was to use NumPy’s number types. Here is a link to the full Python implementation. The important snippets below:

bit_vector = numpy.zeros(bv_size, dtype=numpy.uint32)

for idx in xrange(bv_size):
  bit_vector[idx] = 0
    
  # reading from file
with open('unsorted_phonebook.txt','r') as ifh:
  for line in ifh:
    ph_num = int(line)
    bit = ph_num % int_size
    idx = ph_num / int_size
    comp_int = numpy.uint32(1)
    comp_int <<= bit
    bit_vector[idx] |= comp_int

with open('sorted_phonebook_python.txt','w') as ofh:
  for idx in xrange(bv_size):
    comp_int = numpy.uint32(1)
    if bit_vector[idx] > 0: # small optimization
      for bit in xrange(int_size):
        if comp_int & bit_vector[idx]:
          ofh.write(str(idx*32 + bit) + '\n')
          comp_int <<= 1

The error handling of opening the file is included here showing the nice code density of Python but it’s slow of course (how much slower we’ll see below). Here is a link to the full Cython implementation:

def sort_phonebook():
    
    DEF bv_size = 321500 # 10M / 32 supports 0 - 9999999 (or ph:999-9999)
    DEF int_size = 32

    cdef int bit
    cdef int idx
    cdef int ph_num
    cdef unsigned int comp_int
    cdef unsigned int bit_vector_i

    cdef unsigned int bit_vector[bv_size]

    for idx in xrange(bv_size):
        bit_vector[idx] = 0
    
    # reading from file
    with open('unsorted_phonebook.txt','r') as ifh:
         for line in ifh:
             ph_num = int(line)
             bit = ph_num % int_size
             idx = ph_num / int_size
             comp_int = 1
             comp_int <<= bit
             bit_vector[idx] |= comp_int

    with open('sorted_phonebook_cython.txt','w') as ofh:
         for idx in xrange(bv_size):
             comp_int = 1
             if bit_vector[idx] > 0: # small optimization
                 for bit in xrange(int_size):
                     if comp_int & bit_vector[idx]:
                         ofh.write(str(idx*int_size + bit) + '\n')
                     comp_int <<= 1    

Notice how the Cython code is not very different than the Python code! Cython will need to compile this pyx file for Python to call this function. The setup file is here:

from distutils.core import setup
from Cython.Build import cythonize
setup(
    ext_modules = cythonize("sort_phonebook_cython.pyx")
)

And then execute the setup with the following command:

python setup_cython_phonebook_sort.py build_ext --inplace

Now we can call this function in another Python program with:

import sort_phonebook_cython
sort_phonebook_cython.sort_phonebook()

For more comprehensive benchmarking, I wrote this algorithm in Java. For brevity, see the java implementation here .

On to the performance results. I sorted 6- and 7-digit numbers with these programs.
The results for 6-digits:


$ make
python create_phonebook_file.py
gcc sort_phonebook.c -o sort_phonebook_in_c
javac SortPhonebook.java
python setup_cython_phonebook_sort.py build_ext --inplace
running build_ext
python run_all_tests.py
0.608454942703 seconds to sort in c
1.06622695923 seconds to sort in cython
12.8049960136 seconds to sort in python
1.31410098076 seconds to sort in java
diff sorted_phonebook_c.txt sorted_phonebook_cython.txt
diff sorted_phonebook_c.txt sorted_phonebook_python.txt
diff sorted_phonebook_c.txt sorted_phonebook_java.txt

The results for 7-digits (I excluded the creation of the file since this is really slow in Python with my current implementation):

$ make run_tests
python run_all_tests.py
4.33113193512 seconds to sort in c
10.2070810795 seconds to sort in cython
130.906479836 seconds to sort in python
10.8574941158 seconds to sort in java
diff sorted_phonebook_c.txt sorted_phonebook_cython.txt
diff sorted_phonebook_c.txt sorted_phonebook_python.txt
diff sorted_phonebook_c.txt sorted_phonebook_java.txt

A couple of points:

(1) Confirmed (anecdotally) that the computational complexity growth of these function is mostly linear (10x) going from 6-digit to 7-digit numbers (a 10x growth in number of numbers, 1M to 10M)

(2) The Python implementation was 20x slower than C for sorting 1M numbers and 30x slower than C for sorting 10M numbers

(3) Cython edged Java for sorting 1M numbers, but both implementation were 2-2.5x slower than C.

(4) There are 25 source line of code (SLOC) Python, 35 SLCO for Cython, 51 SLOC Java, 56 SLOC C. There could be argument for performance in terms of this count in terms of developer time.

There you have it. Try Cython if you have a computationally complex function in Python (or high big-O constants as in this evaluation) that you’d like to avoid re-writing in C or Java. But the ultimate performance king is still C.

My Insight Data Engineering Project

The goal is to analyze historical real-estate listing data using data engineering tools, so I built a tool called Theft Market. I leverage Trulia’s API to gather their historic data. See Trulia’s developer page for an overview of their API.

Overview

To bootstrap the data pipeline, Theft Market repeatedly calls Trulia’s API to get the list of states, cities, and zipcodes in the US. It parses the XML responses and puts this information into its Meta Store, a MySQL database. The TruliaInfoFetcher calls getStates, getCitiesInStates, and getZipCodesInState to populate theMeta Store; see information library page for more about these calls.

With information about different geographic areas, Theft Market repeatedly calls Trulia’s API to get real estate data about each of the areas, which takes approximately 50,000 API calls. The TruliaDataFetcher functions used are getStateStats, getCityStats, and getZipCodeStats; see Trulia’s stats library page for more about these calls. The results of these calls are then split in the pipeline.

Pipeline Details

The TruliaDataFetcher uses HappyBase to put data directly into HBase (via the Thrift API) when it finishes parsing a stats response. Also, the stats are sent to HDFS using FluentD to the WebHDFS port on the HDFS NameNode. FluentD appends each record to a file of records in HDFS, and files are partitioned hourly as currently configured. Each line of these record file includes a JSON object for each record allowing flexibility in what was parsed out of the XML.

Subsequently, these large files in HDFS are processed by Hadoop Streaming with these Python map-reduce jobs. This translates the JSON objects to structured, tab-separated files that are used as Hive external tables. To create the Hive tables, use the create table script; see external table creation query for an example to create a city table. One could also write Pig, Cascading, etc. scripts based on the file structure from nice structure of the Hadoop Streaming processing mentioned above. Following that, there are a handful of other ad hoc queries in the hive directory. This concludes progress on the batch processing, deep-dive analytics layer of Theft Market.

Webserver Pipeline

The user is exposed to a simple (read-only) REST API for getting statistics about particular geographic areas. The REST call is handed to a combination of Apache (server), WSGI, and Flask. The Flask has an instance to an object that handles calls to the MySQL manager and an instance to HBase manager. Apache runs multiple Flask threads, with each thread having its own MySQL and HBase manager. The Flask web server routes calls to functions in the RestCallHandler . The RestCallHandler coordinates a combination of MySQL and HBase queries to rapidly answer the REST call (see the diagram above).

MySQL and HBase timings

The timing of these operations are quite fast. The GPS lat and lon distance query takes about a millisecond, where as the HBase access and the manipulation of a row’s columns takes 100-200 ms depending on how many weeks are in the date range. The columns values are keyed by the date of the week listing (e.g. 2014-06-30) textual json storing the average listing and number of listing that comprise the average (e.g. value=”{‘a’:’1000000′,’n’=’13’}”). The average and number is necessary for recomputing the average over the desired date range.

The schema could be enhanced to avoid parsing a python dictionary for each key, but this schema leads to acceptable performance with the data I have; I noticed that Trulia’s historical data goes back to at most 2008 (so that is 52 days/year * 6 years = 312 maximum columns per row). I also don’t take advantage of using different column families for other attributes I would not often access with listing averages.

The API format is straighforward, the caller passes a dictionary (described below) to a base url corresponding to the query interest; here are the following urls supported:

ipaddress/data/city/volume
ipaddress/data/city/average
ipaddress/data/zipcode/volume
ipaddress/data/zipcode/average

where volume is the aggregation of listings over the time period and average is the average listing price over the time period. The dictionary passes the remainder of the search parameters. All dictionaries contains the following key-value pairs:

– start_date (YYYY-MM-DD)
– end_date (YYYY-MM-DD)
– num_bedrooms (positive integer)

For city queries provide:

– state_code (XX)
– city (city name) (some browsers may need a ‘%20’ inserted for spaces in the city name)

For zipcode queries provide:

– zipcode (XXXXX)

A full example of a city average listings call:

ipaddress/data/city/average?q={"state_code":"MA","city":"Boston","num_bedrooms":3,"start_date":"2012-01-01","end_date":"2014-01-01"}

A full example of a zipcode volume listings call:

ipaddress/data/zipcode/volume?q={"zipcode":"02458","num_bedrooms":3,"start_date":"2012-01-01","end_date":"2014-01-01"}

Overall, this was a really fun project with a functioning prototype with 2-3 weeks of work. Some issues I had was using the Thrift API to put data into HBase. I would have used the Java API if I had known how flaky it is. Also, having port 80 open on Amazon Web Services machines is asking for trouble. I saw dozens of different types of access from the Apache server access log from researcher’s web crawlers to a commonly known w00tw00t attack. I’d need learn how to configure Apache to not crash on seemingly simple gets (here I thought Apache was a more hardened server platform than Flask).

Right now the project is off-line because AWS costs money. The 4-node (1 x1large, 3 x1mediums) were paid for by Insight. I’d like to see if I could get the Rest API portion of this project on a single node with decent performance soon. Without a doubt I will be distracted by working hard getting started at my next job (to be determined).