Parsing Millions of Large JSON files in Haskell
11/1/2018In one of my past jobs, I had a directory on a SAN with millions of JSON files of sizes in the 10mb to 12mb range. One of the side-tasks I undertook was building a program that could search through these files for particular matching values, but I wanted it to be quick, so it really wasn’t going to work in Python, which is usually my most socially acceptable form of coding.
Enter Haskell, Enter Aeson
Haskell is a language that I enjoy writing in my free time (this site, for example, is written in Haskell), and it’s not a bad language for scripting, especially when you require raw performance. It does have an intimidating learning curve, however. (In my own case, it took probably four years of Haskell hobbying before I could write the program I’m about to describe.) At any rate, for this project I decided to try to Haskell my way out of it.
For JSON in particular, there’s a Haskell library known as aeson
, which is famous for being fast and reasonable. They even use it at Facebook to parse huge amounts of JSON insanely quickly.
Now, the JSON documents I was parsing are things I’ll refer to as project
s and they were submitted by end-users to be ingested by a Java application. Sometimes things would go wrong and it would be worth it to quickly filter these files to find the document that caused the problem.
Here’s an example type in a module called MyProjects.Types
as well as a FromJSON
instance definition for my type:
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE TemplateHaskell #-}
module MyProjects.Types where
import Data.Aeson
import Data.Int (Int64)
import Control.Lens
import qualified Data.HashMap.Strict as HM
import qualified Data.Text as T
import GHC.Generics
data Project =
Project { _projName :: !Name
, _projRevision :: !Revision
, _dtype :: !Dtype
, _epochTimestamp :: !Int64
, _original :: HM.HashMap T.Text Value
} deriving (Generic, Show, Eq)
instance FromJSON Project where
parseJSON = withObject "someProj" $ o -> do
_projName <- (o .: "NAME")
_projRevision <- o .: "REVISION"
_dtype <- o .: "TYPE"
_epochTimestamp <- o .: "TIMESTAMP"
let _original = o
return Project{..}
Because I sometimes needed access to the original, full JSON value, I also stashed that on my record under the key _original
. Here, I wanted to specify things like _projName
and _projRevision
as specific types, Name
and Revision
respectively, even though they’re just text values, so that I could treat them differently and never get mixed up within a function that needs to take a Name
and a Revision
as parameters. This is one thing that Haskell is great for.
However, these custom types will also need their own FromJSON
instances:
newtype Name = Name T.Text deriving (Show, Eq)
newtype Revision = Revision T.Text deriving (Show, Eq)
newtype DType = DType T.Text deriving (Show, Eq)
data ArbitraryKeyValue = ArbitraryKeyValue T.Text (Maybe T.Text) deriving (Show, Eq)
instance FromJSON Name where
parseJSON (String v) = pure (Name v)
parseJSON _ = fail "a String value for Name"
instance FromJSON Revision where
parseJSON (String v) = pure (Revision v)
parseJSON _ = fail "a String value for Revision"
instance FromJSON DType where
parseJSON (String v) = pure (DType v)
parseJSON _ = fail "a String value for DType"
makeLenses ''Project
Searching, Filter Functions, and Monoids
After I could deserialize one of these JSON documents into a type that my program understands, I needed to figure out how filter a massive number of these files, but because my reason for filtering was that something unforeseen had gone wrong with the ingesting application, my filters could be any arbitrary keys and values: I wouldn’t know beforehand what types of values I’d be looking to find in my documents. These files were also scattered around the filesystem in a directory hierarchy organized by timestamp down to the minute, like so: YYYY/MM/DD/HH/mm
, so sometimes it was useful to find all files for a particular project name. That’s what I called a common filter, but otherwise I could be tasked with seeing whether a key was present or whether a value at a key matched a particular value I had in mind.
For filtering then, I decided to define a filter-type that looked like this:
{-# LANGUAGE TemplateHaskell #-}
module MyProjects.Filters where
import Control.Lens
import Data.Aeson
import qualified Data.HashMap.Strict as HM
import Data.Monoid
import qualified Data.ByteString as B
import MyProjects.Types
newtype ProjectFilter = ProjectFilter { filter_ :: Project -> Bool }
The idea is that a filter is a function from a Project -> Bool
, which is actually the canonical example given for the contravariant
typeclass. I knew about contravariant at the time, but wanted instead to be able to arbitrarily combine ProjectFilter
s into one larger one, which sounded like a simple monoid to me, so I went with that:
instance Monoid ProjectFilter where
mempty = ProjectFilter (const True)
mappend (ProjectFilter f) (ProjectFilter g) = ProjectFilter $ project -> f project && g project
A Monoid
has an identity function and a way to combine two values. In this case, we need an identity function for a function from Project -> Bool
, so we’ll just use const True
. To combine two of these functions together, we’ll simply run the left and then run the right and &&
them together. Thus, we’ve taken two Project -> Bool
functions and rolled them into one Project -> Bool
function and either side could be mempty
.
After that, I defined some specific, commonly-used filter functions:
validProject :: B.ByteString -> Maybe Project
validProject = decodeStrict
matchName :: Name -> ProjectFilter
matchName name' = ProjectFilter $ project -> project ^. projName == name'
matchRevision :: Revision -> ProjectFilter
matchRevision rev = ProjectFilter $ project -> project ^. projRevision == rev
matchDataType :: DType -> ProjectFilter
matchDataType dType' = ProjectFilter $ project -> project ^. dType == dType'
matchNameRevision :: Name -> Revision -> ProjectFilter
matchNameRevision name rev = (matchName name) <> (matchRevision rev)
matchArbKeyValue :: ArbitraryKeyValue -> ProjectFilter
matchArbKeyValue (ArbitraryKeyValue key Nothing) = ProjectFilter $ project -> HM.member key (project ^. original)
matchArbKeyValue (ArbitraryKeyValue key (Just val)) = ProjectFilter $ project ->
case HM.lookup key (project ^. original) of
Nothing -> False
Just someVal -> someVal == String val
What I really wanted was to pass a collection of filters around with the values I was looking for and have all the necessary ones be run against any input JSON Value
, so I created a StdProjectFilters
record:
data StdProjectFilters =
StdProjectFilters {
_nm :: Maybe Name
, _rev :: Maybe Revision
, _dataTyp :: Maybe Dtype
, _arbKeyVal :: Maybe ArbitraryKeyValue
} deriving (Eq, Show)
makeLenses ''StdProjectFilters
After that, I made a function which allowed me to take a StdProjectFilters
value and collapse all the desired values we’re filtering for into one ProjectFilter
using my Monoid
instance defined above (<>
is an alias for mappend
):
makeProjectFilters :: StdProjectFilters -> ProjectFilter
makeProjectFilters stdFilts = name <> rev <> datatyp <> arbKV
where
name = case stdFilts ^. nm of
Nothing -> mempty
Just (nm') -> matchName nm'
rev = case stdFilts ^. rev of
Nothing -> mempty
Just (rv) -> matchRevision rv
datatyp = case stdFilts ^. dataTyp of
Nothing -> mempty
Just (dtyp) -> matchDataType dtyp
arbKV = case stdFilts ^. arbKeyVal of
Nothing -> mempty
Just (arbKeyV) -> matchArbKeyValue arbKeyV
Okay, that’s a decent amount of types and filters, and there are probably ways to condense it all. Still, using the above, I can create a single ProjectFilter
out of a potential collection of arguments by mappend
ing together a bunch of Maybe
values to search for into one combined filter-function.
Conduit, Parsing and Such
For the next part of this project, I decided to use Conduit
in order to stream my filter results with bounded memory usage. My plan with this program was to just dump matching filenames out to stdout, so Conduit is a nice choice because loading millions of 15mb JSON files could be lead to some tricky space issues in Haskell if done poorly.
My first version of this code used was not multi-threaded and looked like this:
module Search where
import Conduit ((.|))
import qualified Data.Conduit as C
import qualified Data.Conduit.Combinators as C
import Control.Monad
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Monad.Trans.Resource (MonadResource)
import qualified Data.ByteString as B
import Data.List (isPrefixOf)
import Data.Maybe (fromJust, isJust)
import System.Path.NameManip (guess_dotdot, absolute_path)
import System.FilePath (addTrailingPathSeparator, normalise)
import System.Directory (getHomeDirectory)
import MyProjects.Filters
-- Single-threaded version with bounded memory usage
mainStreamingConduit :: ProjectFilter -> FilePath -> IO ()
mainStreamingConduit filterFunc searchDir = do
C.runConduitRes $
sourceFilesFilter filterFunc searchDir .| C.mapM_C (liftIO . putStrLn)
sourceFilesFilter :: (MonadResource m, MonadIO m) => ProjectFilter -> FilePath -> C.Source m String
sourceFilesFilter projFilter dirname' =
C.sourceDirectoryDeep False dirname'
.| parseProject projFilter
parseProject :: (MonadResource m, MonadIO m) => ProjectFilter -> C.Conduit FilePath m String
parseProject (ProjectFilter filterFunc) = do
C.awaitForever go
where
go path' = do
bytes <- liftIO $ B.readFile path'
let isProj = validProject bytes
when (isJust isProj) $ do
let proj' = fromJust isProj
when (filterFunc proj') $ C.yield path'
I think of Conduit
as coroutines with smart resource-handling bolted on, which you can kind of see in parseProject
. This function is initialized with a ProjectFilter
and it becomes Conduit
which has FilePath
objects coming in as input and while it yields String
s as output. These String
outputs are simply filenames where both the JSON is parsed as a valid Project
and the input filters are all matching for the JSON contents of the file. Everything that isn’t a valid Project
or doesn’t match the filters will be ignored. (This function is going to be used in this first version and in the final version of the program.)
The other two functions above, mainStreamingConduit
and sourceFilesFilter
, initiate the Conduit
by taking a ProjectFilter
and a path to a system directory and then use C.sourceDirectoryDeep
to walk the whole directory tree yielding each file they find in turn to the parseProject
function. Then for each match yielded by parseProject
, the mainStreamingConduit
just prints it to the screen.
For 18 million, 10MB-12MB JSON files this version of the program had bounded memory usage (nice!) but it took a long time. I recall it taking hours to complete, and I wanted it to go faster.
Side note, this module also contained a function for making an absolute path out of a relative path on posix systems, which I’m going to use later and which I took almost entirely from a SchoolOfHaskell tutorial:
absolutize :: String -> IO String
absolutize aPath
| "/" `isPrefixOf` aPath = pure aPath
| "~" `isPrefixOf` aPath = do
homePath <- getHomeDirectory
pure $ normalise $ addTrailingPathSeparator homePath
++ tail aPath
| otherwise = do
pathMaybeWithDots <- absolute_path aPath
pure $ fromJust $ guess_dotdot pathMaybeWithDots
Conduit, STM-Conduit, Channels
One of the things I love about Haskell are the concurrency tools you get in the language. There’s an absolutely excellent introduction to this, in particular for people familiar with Go, at the site A Tour of Go in Haskell, but for even more inspiration, I recommend the book Parallel and Concurrent Programming in Haskell by Simon Marlow.
In order to speed up my Project
-filtering program, I decided to create a queue, send all filenames into the queue, and then have a bunch of workers processing objects from the queue and printing to the screen in turn.
It did not take a lot of code to make this change:
-- Multiple-threaded version of this program using channels from `stm-conduit`
-- `Int` is number of threads to spin up
mainSTMConduit :: Int -> ProjectFilter -> FilePath -> IO ()
mainSTMConduit nrWorkers filterFunc searchDir = do
children <- newMVar []
inChan <- atomically $ STMChan.newTBMChan 16
_ <- forkIO . C.runResourceT $ do
_ <- register $ atomically $ STMChan.closeTBMChan inChan
C.runConduitRes $ C.sourceDirectoryDeep False searchDir .| STMChan.sinkTBMChan inChan True
forM_ [1..nrWorkers] (_ -> forkChild children $ runConduitChan inChan filterFunc)
waitForChildren children
return ()
This first creates a TBMChan, after which it forks off a process to load all of the filenames into the channel. The channel is bounded, though, which means that writing to the channel will automatically apply back-pressure.
(Note: this version of the code used the following versions: stm-conduit 3.0.0
and conduit 1.12.1
; the next version, stm-conduit 4.0.0
, automatically closes the channel, so the boolean argument here is no longer needed: STMChan.sinkTBMChan inChan True
).
Just beyond that channel-loading code, we fork a bunch of child processes (up to nrWorkers
amount), which all consume from the channel, running the parseProject
function and printing the results, which all happens under the function runConduitChan
:
runConduitChan :: STMChan.TBMChan FilePath -> ProjectFilter -> IO ()
runConduitChan inChan filterFunc = do
C.runConduitRes $
STMChan.sourceTBMChan inChan
.| parseProject filterFunc
.| C.mapM_C (liftIO . putStrLn)
forkChild
and waitForChildren
where inspired by work discovered elsewhere, but we need them to be able to spin off some workers and to actually understand when the whole process (all forked children) are finished:
waitForChildren :: MVar [MVar ()] -> IO ()
waitForChildren children = do
cs <- takeMVar children
case cs of
[] -> return ()
m:ms -> do
putMVar children ms
takeMVar m
waitForChildren children
forkChild :: MVar [MVar ()] -> IO () -> IO ThreadId
forkChild children io = do
mvar <- newEmptyMVar
childs <- takeMVar children
putMVar children (mvar:childs)
forkFinally io (_ -> putMVar mvar ())
This version ran in a fraction of the time (depending on the input-directory size): I remember it scanning around 400,000 files (the number of files relating to a single Project
) in about a minute and all projects in about 20 minutes. After making this change, I could often run the program and wait for it to complete instead of checking on it later in the day.
Running This Thing
To finalize this project, I needed a main
function would could take some arguments:
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE OverloadedStrings #-}
module Main where
import Conduit ((.|))
import qualified Conduit as C
import Control.Concurrent
import Control.Monad (forM_)
import Control.Monad.IO.Class (liftIO)
import Control.Concurrent.STM
import Control.Monad.Trans.Resource (register)
import qualified Data.Conduit.TMChan as STMChan
import qualified Data.Text as T
import Options.Generic
import System.Directory (doesDirectoryExist)
import System.Exit
import MyProjects
data Commands =
Commands { searchPath :: String
, par :: Maybe Int
, name :: Maybe T.Text
, rev :: Maybe T.Text
, dataType :: Maybe T.Text
, key :: Maybe T.Text
, value :: Maybe T.Text
} deriving (Generic, Show)
instance ParseRecord Commands
parseArbKeyVal :: Commands -> Maybe ArbitraryKeyValue
parseArbKeyVal options =
let arb = key options
val = value options
in case arb of
Just key' -> Just (ArbitraryKeyValue key' val)
Nothing -> Nothing
makeProjectFilter :: Commands -> ProjectFilter
makeProjectFilter options =
let stdFilts = StdProjectFilters
(Name <$> name options)
(Revision <$> rev options)
(DType <$> dataType options)
(parseArbKeyVal options)
in makeProjectFilters stdFilts
main :: IO ()
main = do
options <- getRecord "Search My Project JSON Files"
-- Would user like to run in parallel?
let runner = case par options of
Just nrWorkers -> mainSTMConduit nrWorkers
Nothing -> mainStreamingConduit
-- necessary things to search files: search path, filters to use, search dir exists
let filterFunc = makeProjectFilter options
searchDir <- absolutize (searchPath options)
itExists <- doesDirectoryExist searchDir
-- Run it if it exists
case itExists of
False -> putStrLn "Search Directory does not exist" >> exitWith (ExitFailure 1)
True -> runner filterFunc searchDir
And that’s the whole program. My main function has some options: par
, for deciding to usie the stm-conduit
channels version or the first version, as well as some standard filters, Name
and Revision
. Then, after checking whether or not the input directory exists, it sets off.
Conclusion
This was a successful piece of Haskelling for me because it solved a problem I had at work, and it solved it well. While the channels code is perhaps dense, it didn’t become a difficult-to-maintain mess after I decided to add concurrency to my program.
Here, as elsewhere, Haskell proved to have nice abstractions for concurrency, nice libraries, and a conciseness that I’ve never seen in any other languages that I’ve used.
Tags: haskell