{-# LANGUAGE ViewPatterns #-}
module Hermod.ReCon.Trace.Ingest
( Ingestor
, IngestMode(..)
, FailureMode(..)
, IngestorReader(readLineIngestor)
, mkIngestor
, mkIngestorReader
, ingestFileThreaded
) where
import Prelude hiding (Foldable (..))
import Control.Concurrent
import Control.Concurrent.Chan.Unagi as Unagi
import Control.Concurrent.STM.TVar
import Control.Exception
import Control.Monad (forM_, forever, unless, void, when)
import Control.Monad.STM (atomically)
import Data.Aeson (FromJSON (..), decodeStrict', withObject, (.:))
import Data.Bits (shiftL, xor)
import Data.ByteString.Char8 as ByteString (ByteString, hGetLine)
import Data.Char (ord)
import Data.Foldable (foldl')
import qualified Data.Map.Strict as Map
import Data.Maybe (isJust)
import Data.Time.Clock (NominalDiffTime, UTCTime)
import Data.Time.Clock.POSIX
import Data.Word (Word64)
import System.Directory (doesFileExist)
import System.IO
import System.IO.Error (isEOFError)
data Ingestor = Ingestor
{ Ingestor -> Int
ingRetentionMs :: !Int
, Ingestor -> InChan ByteString
ingChanRef :: !(InChan ByteString)
, Ingestor -> TVar (Map LineKey ByteString)
ingInBuffer :: !(TVar (Map.Map LineKey ByteString))
}
type LineKey = (UTCTime, Word64)
data IngestMode =
FromFileStart
| FromFileEnd
deriving (Int -> IngestMode -> ShowS
[IngestMode] -> ShowS
IngestMode -> [Char]
(Int -> IngestMode -> ShowS)
-> (IngestMode -> [Char])
-> ([IngestMode] -> ShowS)
-> Show IngestMode
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> IngestMode -> ShowS
showsPrec :: Int -> IngestMode -> ShowS
$cshow :: IngestMode -> [Char]
show :: IngestMode -> [Char]
$cshowList :: [IngestMode] -> ShowS
showList :: [IngestMode] -> ShowS
Show, IngestMode -> IngestMode -> Bool
(IngestMode -> IngestMode -> Bool)
-> (IngestMode -> IngestMode -> Bool) -> Eq IngestMode
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: IngestMode -> IngestMode -> Bool
== :: IngestMode -> IngestMode -> Bool
$c/= :: IngestMode -> IngestMode -> Bool
/= :: IngestMode -> IngestMode -> Bool
Eq)
data FailureMode =
DieSilently
| RethrowExceptions
deriving (Int -> FailureMode -> ShowS
[FailureMode] -> ShowS
FailureMode -> [Char]
(Int -> FailureMode -> ShowS)
-> (FailureMode -> [Char])
-> ([FailureMode] -> ShowS)
-> Show FailureMode
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> FailureMode -> ShowS
showsPrec :: Int -> FailureMode -> ShowS
$cshow :: FailureMode -> [Char]
show :: FailureMode -> [Char]
$cshowList :: [FailureMode] -> ShowS
showList :: [FailureMode] -> ShowS
Show, FailureMode -> FailureMode -> Bool
(FailureMode -> FailureMode -> Bool)
-> (FailureMode -> FailureMode -> Bool) -> Eq FailureMode
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: FailureMode -> FailureMode -> Bool
== :: FailureMode -> FailureMode -> Bool
$c/= :: FailureMode -> FailureMode -> Bool
/= :: FailureMode -> FailureMode -> Bool
Eq)
newtype IngestorReader = IngestorReader { IngestorReader -> IO ByteString
readLineIngestor :: IO ByteString }
newtype Timestamp = Timestamp UTCTime
instance FromJSON Timestamp where
parseJSON :: Value -> Parser Timestamp
parseJSON = [Char] -> (Object -> Parser Timestamp) -> Value -> Parser Timestamp
forall a. [Char] -> (Object -> Parser a) -> Value -> Parser a
withObject [Char]
"Timestamp" ((Object -> Parser Timestamp) -> Value -> Parser Timestamp)
-> (Object -> Parser Timestamp) -> Value -> Parser Timestamp
forall a b. (a -> b) -> a -> b
$ \Object
o ->
UTCTime -> Timestamp
Timestamp (UTCTime -> Timestamp) -> Parser UTCTime -> Parser Timestamp
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Object
o Object -> Key -> Parser UTCTime
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"at"
mkIngestor :: Int -> IO Ingestor
mkIngestor :: Int -> IO Ingestor
mkIngestor (Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
100 -> Int
millisecs) = do
(InChan ByteString
inChan, OutChan ByteString
outChan) <- IO (InChan ByteString, OutChan ByteString)
forall a. IO (InChan a, OutChan a)
Unagi.newChan
Ingestor
ingestor <- Int
-> InChan ByteString -> TVar (Map LineKey ByteString) -> Ingestor
Ingestor Int
millisecs InChan ByteString
inChan (TVar (Map LineKey ByteString) -> Ingestor)
-> IO (TVar (Map LineKey ByteString)) -> IO Ingestor
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Map LineKey ByteString -> IO (TVar (Map LineKey ByteString))
forall a. a -> IO (TVar a)
newTVarIO Map LineKey ByteString
forall k a. Map k a
Map.empty
IO ThreadId -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ThreadId -> IO ()) -> (IO () -> IO ThreadId) -> IO () -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> IO ThreadId
forkIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
OutChan ByteString -> Ingestor -> IO ()
forall {a} {b}. OutChan a -> Ingestor -> IO b
go OutChan ByteString
outChan Ingestor
ingestor
Ingestor -> IO Ingestor
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Ingestor
ingestor
where
deltaT :: NominalDiffTime
deltaT :: NominalDiffTime
deltaT = Int -> NominalDiffTime
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
millisecs NominalDiffTime -> NominalDiffTime -> NominalDiffTime
forall a. Fractional a => a -> a -> a
/ NominalDiffTime
1000
flushChan :: OutChan a -> IO ()
flushChan OutChan a
outChan = do
(Element IO (Maybe a)
hasNext, IO a
_) <- OutChan a -> IO (Element a, IO a)
forall a. OutChan a -> IO (Element a, IO a)
Unagi.tryReadChan OutChan a
outChan
Maybe a
maybeNext <- IO (Maybe a)
hasNext
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Maybe a -> Bool
forall a. Maybe a -> Bool
isJust Maybe a
maybeNext) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
OutChan a -> IO ()
flushChan OutChan a
outChan
go :: OutChan a -> Ingestor -> IO b
go OutChan a
outChan Ingestor{Int
TVar (Map LineKey ByteString)
InChan ByteString
ingRetentionMs :: Ingestor -> Int
ingChanRef :: Ingestor -> InChan ByteString
ingInBuffer :: Ingestor -> TVar (Map LineKey ByteString)
ingRetentionMs :: Int
ingChanRef :: InChan ByteString
ingInBuffer :: TVar (Map LineKey ByteString)
..} = IO () -> IO b
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO b) -> IO () -> IO b
forall a b. (a -> b) -> a -> b
$ do
Int -> IO ()
threadDelay (Int -> IO ()) -> Int -> IO ()
forall a b. (a -> b) -> a -> b
$ Int
millisecs Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000
NominalDiffTime
now <- IO NominalDiffTime
getPOSIXTime
let cutoff :: UTCTime
cutoff = NominalDiffTime -> UTCTime
posixSecondsToUTCTime (NominalDiffTime -> UTCTime) -> NominalDiffTime -> UTCTime
forall a b. (a -> b) -> a -> b
$ NominalDiffTime
now NominalDiffTime -> NominalDiffTime -> NominalDiffTime
forall a. Num a => a -> a -> a
- NominalDiffTime
deltaT
!Map LineKey ByteString
m <- STM (Map LineKey ByteString) -> IO (Map LineKey ByteString)
forall a. STM a -> IO a
atomically (STM (Map LineKey ByteString) -> IO (Map LineKey ByteString))
-> STM (Map LineKey ByteString) -> IO (Map LineKey ByteString)
forall a b. (a -> b) -> a -> b
$
TVar (Map LineKey ByteString)
-> (Map LineKey ByteString
-> (Map LineKey ByteString, Map LineKey ByteString))
-> STM (Map LineKey ByteString)
forall s a. TVar s -> (s -> (a, s)) -> STM a
stateTVar TVar (Map LineKey ByteString)
ingInBuffer ((Map LineKey ByteString
-> (Map LineKey ByteString, Map LineKey ByteString))
-> STM (Map LineKey ByteString))
-> (Map LineKey ByteString
-> (Map LineKey ByteString, Map LineKey ByteString))
-> STM (Map LineKey ByteString)
forall a b. (a -> b) -> a -> b
$
(LineKey -> Bool)
-> Map LineKey ByteString
-> (Map LineKey ByteString, Map LineKey ByteString)
forall k a. (k -> Bool) -> Map k a -> (Map k a, Map k a)
Map.spanAntitone ((UTCTime -> UTCTime -> Bool
forall a. Ord a => a -> a -> Bool
< UTCTime
cutoff) (UTCTime -> Bool) -> (LineKey -> UTCTime) -> LineKey -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. LineKey -> UTCTime
forall a b. (a, b) -> a
fst)
InChan ByteString -> [ByteString] -> IO ()
forall a. InChan a -> [a] -> IO ()
Unagi.writeList2Chan InChan ByteString
ingChanRef ([ByteString] -> IO ()) -> [ByteString] -> IO ()
forall a b. (a -> b) -> a -> b
$
(LineKey, ByteString) -> ByteString
forall a b. (a, b) -> b
snd ((LineKey, ByteString) -> ByteString)
-> [(LineKey, ByteString)] -> [ByteString]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Map LineKey ByteString -> [(LineKey, ByteString)]
forall k a. Map k a -> [(k, a)]
Map.toAscList Map LineKey ByteString
m
OutChan a -> IO ()
forall {a}. OutChan a -> IO ()
flushChan OutChan a
outChan
mkIngestorReader :: Ingestor -> IO IngestorReader
mkIngestorReader :: Ingestor -> IO IngestorReader
mkIngestorReader Ingestor{InChan ByteString
ingChanRef :: Ingestor -> InChan ByteString
ingChanRef :: InChan ByteString
ingChanRef} = do
!OutChan ByteString
newOutChan <- InChan ByteString -> IO (OutChan ByteString)
forall a. InChan a -> IO (OutChan a)
Unagi.dupChan InChan ByteString
ingChanRef
IngestorReader -> IO IngestorReader
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (IngestorReader -> IO IngestorReader)
-> IngestorReader -> IO IngestorReader
forall a b. (a -> b) -> a -> b
$ IO ByteString -> IngestorReader
IngestorReader (IO ByteString -> IngestorReader)
-> IO ByteString -> IngestorReader
forall a b. (a -> b) -> a -> b
$ OutChan ByteString -> IO ByteString
forall a. OutChan a -> IO a
Unagi.readChan OutChan ByteString
newOutChan
ingestFileThreaded :: Ingestor -> FailureMode -> IngestMode -> FilePath -> IO (Maybe String)
ingestFileThreaded :: Ingestor
-> FailureMode -> IngestMode -> [Char] -> IO (Maybe [Char])
ingestFileThreaded Ingestor{TVar (Map LineKey ByteString)
ingInBuffer :: Ingestor -> TVar (Map LineKey ByteString)
ingInBuffer :: TVar (Map LineKey ByteString)
ingInBuffer, Int
ingRetentionMs :: Ingestor -> Int
ingRetentionMs :: Int
ingRetentionMs} FailureMode
failMode IngestMode
ingestMode [Char]
fp =
[Char] -> IO Bool
doesFileExist [Char]
fp IO Bool -> (Bool -> IO (Maybe [Char])) -> IO (Maybe [Char])
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Bool
False -> Maybe [Char] -> IO (Maybe [Char])
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe [Char] -> IO (Maybe [Char]))
-> Maybe [Char] -> IO (Maybe [Char])
forall a b. (a -> b) -> a -> b
$ [Char] -> Maybe [Char]
forall a. a -> Maybe a
Just ([Char] -> Maybe [Char]) -> [Char] -> Maybe [Char]
forall a b. (a -> b) -> a -> b
$ [Char]
"ingestFileThreaded: file not found: " [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
fp
Bool
True -> do
IO ThreadId -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ThreadId -> IO ()) -> (IO () -> IO ThreadId) -> IO () -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> IO ThreadId
forkIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
(SomeException -> IO ()) -> IO () -> IO ()
forall e a. Exception e => (e -> IO a) -> IO a -> IO a
handle (\(SomeException
ex :: SomeException) -> Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (FailureMode
failMode FailureMode -> FailureMode -> Bool
forall a. Eq a => a -> a -> Bool
== FailureMode
DieSilently) (SomeException -> IO ()
forall e a. Exception e => e -> IO a
throwIO SomeException
ex))
IO ()
forall {r}. IO r
thread
Maybe [Char] -> IO (Maybe [Char])
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe [Char]
forall a. Maybe a
Nothing
where
pollingDelay :: Int
pollingDelay = Int
1000 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
20 (Int
ingRetentionMs Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` Int
40)
filePathHash :: Word64
filePathHash = [Char] -> Word64
djb2 [Char]
fp
thread :: IO r
thread = [Char] -> IOMode -> (Handle -> IO r) -> IO r
forall r. [Char] -> IOMode -> (Handle -> IO r) -> IO r
withFile [Char]
fp IOMode
ReadMode ((Handle -> IO r) -> IO r) -> (Handle -> IO r) -> IO r
forall a b. (a -> b) -> a -> b
$ \Handle
hdl -> do
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (IngestMode
ingestMode IngestMode -> IngestMode -> Bool
forall a. Eq a => a -> a -> Bool
== IngestMode
FromFileEnd) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
Handle -> SeekMode -> Integer -> IO ()
hSeek Handle
hdl SeekMode
SeekFromEnd Integer
0
IO () -> IO r
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO r) -> IO () -> IO r
forall a b. (a -> b) -> a -> b
$ do
Int -> IO ()
threadDelay Int
pollingDelay
Handle -> IO ()
ingestLines Handle
hdl
ingestLines :: Handle -> IO ()
ingestLines :: Handle -> IO ()
ingestLines Handle
hdl = IO ByteString -> IO (Either IOException ByteString)
forall e a. Exception e => IO a -> IO (Either e a)
try (Handle -> IO ByteString
ByteString.hGetLine Handle
hdl) IO (Either IOException ByteString)
-> (Either IOException ByteString -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Right ByteString
line -> do
Maybe Timestamp -> (Timestamp -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (ByteString -> Maybe Timestamp
forall a. FromJSON a => ByteString -> Maybe a
decodeStrict' ByteString
line) ((Timestamp -> IO ()) -> IO ()) -> (Timestamp -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(Timestamp UTCTime
ts) ->
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (Map LineKey ByteString)
-> (Map LineKey ByteString -> Map LineKey ByteString) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (Map LineKey ByteString)
ingInBuffer ((Map LineKey ByteString -> Map LineKey ByteString) -> STM ())
-> (Map LineKey ByteString -> Map LineKey ByteString) -> STM ()
forall a b. (a -> b) -> a -> b
$
LineKey
-> ByteString -> Map LineKey ByteString -> Map LineKey ByteString
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert (UTCTime
ts, Word64
filePathHash) ByteString
line
Handle -> IO ()
ingestLines Handle
hdl
Left (IOException
err :: IOException) | IOException -> Bool
isEOFError IOException
err -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
| Bool
otherwise -> IOException -> IO ()
forall a. IOException -> IO a
ioError IOException
err
djb2 :: [Char] -> Word64
djb2 :: [Char] -> Word64
djb2 = (Word64 -> Char -> Word64) -> Word64 -> [Char] -> Word64
forall b a. (b -> a -> b) -> b -> [a] -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl' (\Word64
acc (Int -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Word64) -> (Char -> Int) -> Char -> Word64
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Char -> Int
ord -> Word64
i) -> ((Word64
acc Word64 -> Int -> Word64
forall a. Bits a => a -> Int -> a
`shiftL` Int
5) Word64 -> Word64 -> Word64
forall a. Num a => a -> a -> a
+ Word64
acc) Word64 -> Word64 -> Word64
forall a. Bits a => a -> a -> a
`xor` Word64
i) Word64
5381