Parsing Millions of Large JSON files in Haskell

11/1/2018
Los Angeles freeways 10 and 110

In one of my past jobs, I had a directory on a SAN with millions of JSON files of sizes in the 10mb to 12mb range. One of the side-tasks I undertook was building a program that could search through these files for particular matching values, but I wanted it to be quick, so it really wasn’t going to work in Python, which is usually my most socially acceptable form of coding.

Enter Haskell, Enter Aeson

Haskell is a language that I enjoy writing in my free time (this site, for example, is written in Haskell), and it’s not a bad language for scripting, especially when you require raw performance. It does have an intimidating learning curve, however. (In my own case, it took probably four years of Haskell hobbying before I could write the program I’m about to describe.) At any rate, for this project I decided to try to Haskell my way out of it.

For JSON in particular, there’s a Haskell library known as aeson, which is famous for being fast and reasonable. They even use it at Facebook to parse huge amounts of JSON insanely quickly.

Now, the JSON documents I was parsing are things I’ll refer to as projects and they were submitted by end-users to be ingested by a Java application. Sometimes things would go wrong and it would be worth it to quickly filter these files to find the document that caused the problem.

Here’s an example type in a module called MyProjects.Types as well as a FromJSON instance definition for my type:

{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE TemplateHaskell #-}

module MyProjects.Types where

import Data.Aeson
import Data.Int (Int64)
import Control.Lens
import qualified Data.HashMap.Strict as HM
import qualified Data.Text as T
import GHC.Generics


data Project =
  Project { _projName :: !Name
          , _projRevision :: !Revision
          , _dtype :: !Dtype
          , _epochTimestamp :: !Int64
          , _original :: HM.HashMap T.Text Value
          } deriving (Generic, Show, Eq)

instance FromJSON Project where
  parseJSON = withObject "someProj" $ o -> do
    _projName <- (o .: "NAME")
     _projRevision <- o .: "REVISION"
    _dtype <- o .: "TYPE"
    _epochTimestamp <- o .: "TIMESTAMP"
    let _original = o
    return Project{..}

Because I sometimes needed access to the original, full JSON value, I also stashed that on my record under the key _original. Here, I wanted to specify things like _projName and _projRevision as specific types, Name and Revision respectively, even though they’re just text values, so that I could treat them differently and never get mixed up within a function that needs to take a Name and a Revision as parameters. This is one thing that Haskell is great for.

However, these custom types will also need their own FromJSON instances:

newtype Name = Name T.Text deriving (Show, Eq)
newtype Revision = Revision T.Text deriving (Show, Eq)
newtype DType = DType T.Text deriving (Show, Eq)
data ArbitraryKeyValue = ArbitraryKeyValue T.Text (Maybe T.Text) deriving (Show, Eq)

instance FromJSON Name where
    parseJSON (String v) = pure (Name v)
    parseJSON _ = fail "a String value for Name"

instance FromJSON Revision where
    parseJSON (String v) = pure (Revision v)
    parseJSON _ = fail "a String value for Revision"

instance FromJSON DType where
    parseJSON (String v) = pure (DType v)
    parseJSON _ = fail "a String value for DType"

makeLenses ''Project

Searching, Filter Functions, and Monoids

After I could deserialize one of these JSON documents into a type that my program understands, I needed to figure out how filter a massive number of these files, but because my reason for filtering was that something unforeseen had gone wrong with the ingesting application, my filters could be any arbitrary keys and values: I wouldn’t know beforehand what types of values I’d be looking to find in my documents. These files were also scattered around the filesystem in a directory hierarchy organized by timestamp down to the minute, like so: YYYY/MM/DD/HH/mm, so sometimes it was useful to find all files for a particular project name. That’s what I called a common filter, but otherwise I could be tasked with seeing whether a key was present or whether a value at a key matched a particular value I had in mind.

For filtering then, I decided to define a filter-type that looked like this:

{-# LANGUAGE TemplateHaskell #-}

module MyProjects.Filters where

import Control.Lens
import Data.Aeson
import qualified Data.HashMap.Strict as HM
import Data.Monoid
import qualified Data.ByteString as B

import MyProjects.Types

newtype ProjectFilter = ProjectFilter { filter_ :: Project -> Bool }

The idea is that a filter is a function from a Project -> Bool, which is actually the canonical example given for the contravariant typeclass. I knew about contravariant at the time, but wanted instead to be able to arbitrarily combine ProjectFilters into one larger one, which sounded like a simple monoid to me, so I went with that:

instance Monoid ProjectFilter where
  mempty = ProjectFilter (const True)
  mappend (ProjectFilter f) (ProjectFilter g) = ProjectFilter $ project -> f project && g project

A Monoid has an identity function and a way to combine two values. In this case, we need an identity function for a function from Project -> Bool, so we’ll just use const True. To combine two of these functions together, we’ll simply run the left and then run the right and && them together. Thus, we’ve taken two Project -> Bool functions and rolled them into one Project -> Bool function and either side could be mempty.

After that, I defined some specific, commonly-used filter functions:

validProject :: B.ByteString -> Maybe Project
validProject = decodeStrict

matchName :: Name -> ProjectFilter
matchName name' = ProjectFilter $ project -> project ^. projName == name'

matchRevision :: Revision -> ProjectFilter
matchRevision rev = ProjectFilter $ project -> project ^. projRevision == rev

matchDataType :: DType -> ProjectFilter
matchDataType dType' = ProjectFilter $ project -> project ^. dType == dType'

matchNameRevision :: Name -> Revision -> ProjectFilter
matchNameRevision name rev = (matchName name) <> (matchRevision rev)

matchArbKeyValue :: ArbitraryKeyValue -> ProjectFilter
matchArbKeyValue (ArbitraryKeyValue key Nothing) = ProjectFilter $ project -> HM.member key (project ^. original)
matchArbKeyValue (ArbitraryKeyValue key (Just val)) = ProjectFilter $ project ->
  case HM.lookup key (project ^. original) of
    Nothing -> False
    Just someVal -> someVal == String val

What I really wanted was to pass a collection of filters around with the values I was looking for and have all the necessary ones be run against any input JSON Value, so I created a StdProjectFilters record:

data StdProjectFilters =
  StdProjectFilters {
                    _nm :: Maybe Name
                    , _rev :: Maybe Revision
                    , _dataTyp :: Maybe Dtype
                    , _arbKeyVal :: Maybe ArbitraryKeyValue
                    } deriving (Eq, Show)

makeLenses ''StdProjectFilters

After that, I made a function which allowed me to take a StdProjectFilters value and collapse all the desired values we’re filtering for into one ProjectFilter using my Monoid instance defined above (<> is an alias for mappend):

makeProjectFilters :: StdProjectFilters -> ProjectFilter
makeProjectFilters stdFilts = name <> rev <> datatyp <> arbKV
  where
    name = case stdFilts ^. nm of
      Nothing -> mempty
      Just (nm') -> matchName nm'
    rev = case stdFilts ^. rev of
      Nothing -> mempty
      Just (rv) -> matchRevision rv
    datatyp = case stdFilts ^. dataTyp of
      Nothing -> mempty
      Just (dtyp) -> matchDataType dtyp
    arbKV = case stdFilts ^. arbKeyVal of
      Nothing -> mempty
      Just (arbKeyV) -> matchArbKeyValue arbKeyV

Okay, that’s a decent amount of types and filters, and there are probably ways to condense it all. Still, using the above, I can create a single ProjectFilter out of a potential collection of arguments by mappending together a bunch of Maybe values to search for into one combined filter-function.

Conduit, Parsing and Such

For the next part of this project, I decided to use Conduit in order to stream my filter results with bounded memory usage. My plan with this program was to just dump matching filenames out to stdout, so Conduit is a nice choice because loading millions of 15mb JSON files could be lead to some tricky space issues in Haskell if done poorly.

My first version of this code used was not multi-threaded and looked like this:

module Search where

import           Conduit               ((.|))
import qualified Data.Conduit               as C
import qualified Data.Conduit.Combinators               as C
import           Control.Monad
import           Control.Monad.IO.Class   (MonadIO, liftIO)
import           Control.Monad.Trans.Resource (MonadResource)
import qualified Data.ByteString       as B
import           Data.List             (isPrefixOf)
import           Data.Maybe            (fromJust, isJust)
import           System.Path.NameManip (guess_dotdot, absolute_path)
import           System.FilePath       (addTrailingPathSeparator, normalise)
import           System.Directory      (getHomeDirectory)

import           MyProjects.Filters

-- Single-threaded version with bounded memory usage
mainStreamingConduit :: ProjectFilter -> FilePath -> IO ()
mainStreamingConduit filterFunc searchDir = do
  C.runConduitRes $
      sourceFilesFilter filterFunc searchDir .| C.mapM_C (liftIO . putStrLn)

sourceFilesFilter :: (MonadResource m, MonadIO m) => ProjectFilter -> FilePath -> C.Source m String
sourceFilesFilter projFilter dirname' =
    C.sourceDirectoryDeep False dirname'
    .| parseProject projFilter

parseProject :: (MonadResource m, MonadIO m) => ProjectFilter -> C.Conduit FilePath m String
parseProject (ProjectFilter filterFunc) = do
  C.awaitForever go
  where
    go path' = do
      bytes <- liftIO $ B.readFile path'
      let isProj = validProject bytes
      when (isJust isProj) $ do
        let proj' = fromJust isProj
        when (filterFunc proj') $ C.yield path'

I think of Conduit as coroutines with smart resource-handling bolted on, which you can kind of see in parseProject. This function is initialized with a ProjectFilter and it becomes Conduit which has FilePath objects coming in as input and while it yields Strings as output. These String outputs are simply filenames where both the JSON is parsed as a valid Project and the input filters are all matching for the JSON contents of the file. Everything that isn’t a valid Project or doesn’t match the filters will be ignored. (This function is going to be used in this first version and in the final version of the program.)

The other two functions above, mainStreamingConduit and sourceFilesFilter, initiate the Conduit by taking a ProjectFilter and a path to a system directory and then use C.sourceDirectoryDeep to walk the whole directory tree yielding each file they find in turn to the parseProject function. Then for each match yielded by parseProject, the mainStreamingConduit just prints it to the screen.

For 18 million, 10MB-12MB JSON files this version of the program had bounded memory usage (nice!) but it took a long time. I recall it taking hours to complete, and I wanted it to go faster.

Side note, this module also contained a function for making an absolute path out of a relative path on posix systems, which I’m going to use later and which I took almost entirely from a SchoolOfHaskell tutorial:

absolutize :: String -> IO String
absolutize aPath
    | "/" `isPrefixOf` aPath = pure aPath
    | "~" `isPrefixOf` aPath = do
        homePath <- getHomeDirectory
        pure $ normalise $ addTrailingPathSeparator homePath
                             ++ tail aPath
    | otherwise = do
        pathMaybeWithDots <- absolute_path aPath
        pure $ fromJust $ guess_dotdot pathMaybeWithDots

Conduit, STM-Conduit, Channels

One of the things I love about Haskell are the concurrency tools you get in the language. There’s an absolutely excellent introduction to this, in particular for people familiar with Go, at the site A Tour of Go in Haskell, but for even more inspiration, I recommend the book Parallel and Concurrent Programming in Haskell by Simon Marlow.

In order to speed up my Project-filtering program, I decided to create a queue, send all filenames into the queue, and then have a bunch of workers processing objects from the queue and printing to the screen in turn.

It did not take a lot of code to make this change:

-- Multiple-threaded version of this program using channels from `stm-conduit`
-- `Int` is number of threads to spin up
mainSTMConduit :: Int -> ProjectFilter -> FilePath -> IO ()
mainSTMConduit nrWorkers filterFunc searchDir = do
  children <- newMVar []
  inChan <- atomically $ STMChan.newTBMChan 16
  _ <- forkIO . C.runResourceT $ do
         _ <- register $ atomically $ STMChan.closeTBMChan inChan
         C.runConduitRes $ C.sourceDirectoryDeep False searchDir .| STMChan.sinkTBMChan inChan True
  forM_ [1..nrWorkers] (_ -> forkChild children $ runConduitChan inChan filterFunc)
  waitForChildren children
  return ()

This first creates a TBMChan, after which it forks off a process to load all of the filenames into the channel. The channel is bounded, though, which means that writing to the channel will automatically apply back-pressure.

(Note: this version of the code used the following versions: stm-conduit 3.0.0 and conduit 1.12.1; the next version, stm-conduit 4.0.0, automatically closes the channel, so the boolean argument here is no longer needed: STMChan.sinkTBMChan inChan True).

Just beyond that channel-loading code, we fork a bunch of child processes (up to nrWorkers amount), which all consume from the channel, running the parseProject function and printing the results, which all happens under the function runConduitChan:

runConduitChan :: STMChan.TBMChan FilePath -> ProjectFilter -> IO ()
runConduitChan inChan filterFunc = do
  C.runConduitRes $
       STMChan.sourceTBMChan inChan
       .| parseProject filterFunc
       .| C.mapM_C (liftIO . putStrLn)

forkChild and waitForChildren where inspired by work discovered elsewhere, but we need them to be able to spin off some workers and to actually understand when the whole process (all forked children) are finished:

waitForChildren :: MVar [MVar ()] -> IO ()
waitForChildren children = do
  cs <- takeMVar children
  case cs of
    []   -> return ()
    m:ms -> do
      putMVar children ms
      takeMVar m
      waitForChildren children

forkChild :: MVar [MVar ()] -> IO () -> IO ThreadId
forkChild children io = do
  mvar <- newEmptyMVar
  childs <- takeMVar children
  putMVar children (mvar:childs)
  forkFinally io (_ -> putMVar mvar ())

This version ran in a fraction of the time (depending on the input-directory size): I remember it scanning around 400,000 files (the number of files relating to a single Project) in about a minute and all projects in about 20 minutes. After making this change, I could often run the program and wait for it to complete instead of checking on it later in the day.

Running This Thing

To finalize this project, I needed a main function would could take some arguments:

{-# LANGUAGE DeriveGeneric     #-}
{-# LANGUAGE OverloadedStrings #-}

module Main where

import           Conduit                      ((.|))
import qualified Conduit                      as C
import           Control.Concurrent
import           Control.Monad                (forM_)
import           Control.Monad.IO.Class       (liftIO)
import           Control.Concurrent.STM
import           Control.Monad.Trans.Resource (register)
import qualified Data.Conduit.TMChan          as STMChan
import qualified Data.Text                    as T
import           Options.Generic
import           System.Directory            (doesDirectoryExist)
import           System.Exit

import           MyProjects

data Commands =
  Commands { searchPath  :: String
           , par         :: Maybe Int
           , name        :: Maybe T.Text
           , rev        :: Maybe T.Text
           , dataType    :: Maybe T.Text
           , key         :: Maybe T.Text
           , value       :: Maybe T.Text
           } deriving (Generic, Show)

instance ParseRecord Commands

parseArbKeyVal :: Commands -> Maybe ArbitraryKeyValue
parseArbKeyVal options =
  let arb = key options
      val = value options
  in case arb of
    Just key' -> Just (ArbitraryKeyValue key' val)
    Nothing -> Nothing

makeProjectFilter :: Commands -> ProjectFilter
makeProjectFilter options =
  let stdFilts = StdProjectFilters
        (Name <$> name options)
        (Revision <$> rev options)
        (DType <$> dataType options)
        (parseArbKeyVal options)
  in makeProjectFilters stdFilts


main :: IO ()
main = do
  options <- getRecord "Search My Project JSON Files"
  -- Would user like to run in parallel?
  let runner = case par options of
        Just nrWorkers -> mainSTMConduit nrWorkers
        Nothing -> mainStreamingConduit

  -- necessary things to search files: search path, filters to use, search dir exists
  let filterFunc = makeProjectFilter options
  searchDir <- absolutize (searchPath options)
  itExists <- doesDirectoryExist searchDir

  -- Run it if it exists
  case itExists of
    False -> putStrLn "Search Directory does not exist" >> exitWith (ExitFailure 1)
    True -> runner filterFunc searchDir

And that’s the whole program. My main function has some options: par, for deciding to usie the stm-conduit channels version or the first version, as well as some standard filters, Name and Revision. Then, after checking whether or not the input directory exists, it sets off.

Conclusion

This was a successful piece of Haskelling for me because it solved a problem I had at work, and it solved it well. While the channels code is perhaps dense, it didn’t become a difficult-to-maintain mess after I decided to add concurrency to my program.

Here, as elsewhere, Haskell proved to have nice abstractions for concurrency, nice libraries, and a conciseness that I’ve never seen in any other languages that I’ve used.

Tags: haskell