{-# LANGUAGE ViewPatterns #-}

module Hermod.ReCon.Trace.Ingest
       ( Ingestor
       , IngestMode(..)
       , FailureMode(..)
       , IngestorReader(readLineIngestor)
       , mkIngestor
       , mkIngestorReader
       , ingestFileThreaded
       ) where

import           Prelude hiding (Foldable (..))

import           Control.Concurrent
import           Control.Concurrent.Chan.Unagi as Unagi
import           Control.Concurrent.STM.TVar
import           Control.Exception
import           Control.Monad (forM_, forever, unless, void, when)
import           Control.Monad.STM (atomically)
import           Data.Aeson (FromJSON (..), decodeStrict', withObject, (.:))
import           Data.Bits (shiftL, xor)
import           Data.ByteString.Char8 as ByteString (ByteString, hGetLine)
import           Data.Char (ord)
import           Data.Foldable (foldl')
import qualified Data.Map.Strict as Map
import           Data.Maybe (isJust)
import           Data.Time.Clock (NominalDiffTime, UTCTime)
import           Data.Time.Clock.POSIX
import           Data.Word (Word64)
import           System.Directory (doesFileExist)
import           System.IO
import           System.IO.Error (isEOFError)


-- NOTES:
-- 1. The ingested files need to be produced using line buffering.
--    The ingestor logic currently does not support reading / incrementally parsing
--    partial trace messages.


data Ingestor = Ingestor
  { Ingestor -> Int
ingRetentionMs :: !Int
  , Ingestor -> InChan ByteString
ingChanRef     :: !(InChan ByteString)
  , Ingestor -> TVar (Map LineKey ByteString)
ingInBuffer    :: !(TVar (Map.Map LineKey ByteString))
  }

-- In the unlikely case there's a collision of timestamps coming from
-- different ingested files, we tag it with some hash of its file path
type LineKey = (UTCTime, Word64)

data IngestMode =
    FromFileStart
  | FromFileEnd
  deriving (Int -> IngestMode -> ShowS
[IngestMode] -> ShowS
IngestMode -> [Char]
(Int -> IngestMode -> ShowS)
-> (IngestMode -> [Char])
-> ([IngestMode] -> ShowS)
-> Show IngestMode
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> IngestMode -> ShowS
showsPrec :: Int -> IngestMode -> ShowS
$cshow :: IngestMode -> [Char]
show :: IngestMode -> [Char]
$cshowList :: [IngestMode] -> ShowS
showList :: [IngestMode] -> ShowS
Show, IngestMode -> IngestMode -> Bool
(IngestMode -> IngestMode -> Bool)
-> (IngestMode -> IngestMode -> Bool) -> Eq IngestMode
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: IngestMode -> IngestMode -> Bool
== :: IngestMode -> IngestMode -> Bool
$c/= :: IngestMode -> IngestMode -> Bool
/= :: IngestMode -> IngestMode -> Bool
Eq)

data FailureMode =
    DieSilently
  | RethrowExceptions
  deriving (Int -> FailureMode -> ShowS
[FailureMode] -> ShowS
FailureMode -> [Char]
(Int -> FailureMode -> ShowS)
-> (FailureMode -> [Char])
-> ([FailureMode] -> ShowS)
-> Show FailureMode
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> FailureMode -> ShowS
showsPrec :: Int -> FailureMode -> ShowS
$cshow :: FailureMode -> [Char]
show :: FailureMode -> [Char]
$cshowList :: [FailureMode] -> ShowS
showList :: [FailureMode] -> ShowS
Show, FailureMode -> FailureMode -> Bool
(FailureMode -> FailureMode -> Bool)
-> (FailureMode -> FailureMode -> Bool) -> Eq FailureMode
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: FailureMode -> FailureMode -> Bool
== :: FailureMode -> FailureMode -> Bool
$c/= :: FailureMode -> FailureMode -> Bool
/= :: FailureMode -> FailureMode -> Bool
Eq)

-- a blocking reader
newtype IngestorReader = IngestorReader { IngestorReader -> IO ByteString
readLineIngestor :: IO ByteString }

newtype Timestamp = Timestamp UTCTime

instance FromJSON Timestamp where
  parseJSON :: Value -> Parser Timestamp
parseJSON = [Char] -> (Object -> Parser Timestamp) -> Value -> Parser Timestamp
forall a. [Char] -> (Object -> Parser a) -> Value -> Parser a
withObject [Char]
"Timestamp" ((Object -> Parser Timestamp) -> Value -> Parser Timestamp)
-> (Object -> Parser Timestamp) -> Value -> Parser Timestamp
forall a b. (a -> b) -> a -> b
$ \Object
o ->
    UTCTime -> Timestamp
Timestamp (UTCTime -> Timestamp) -> Parser UTCTime -> Parser Timestamp
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Object
o Object -> Key -> Parser UTCTime
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"at"


-- The interval in ms for which ingested lines will remain
-- buffered, before being piped to the queue for consumption.
-- Any consumer will have that as "lag behind" to real-time.
mkIngestor :: Int -> IO Ingestor
mkIngestor :: Int -> IO Ingestor
mkIngestor (Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
100 -> Int
millisecs) = do
  (InChan ByteString
inChan, OutChan ByteString
outChan) <- IO (InChan ByteString, OutChan ByteString)
forall a. IO (InChan a, OutChan a)
Unagi.newChan
  Ingestor
ingestor <- Int
-> InChan ByteString -> TVar (Map LineKey ByteString) -> Ingestor
Ingestor Int
millisecs InChan ByteString
inChan (TVar (Map LineKey ByteString) -> Ingestor)
-> IO (TVar (Map LineKey ByteString)) -> IO Ingestor
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Map LineKey ByteString -> IO (TVar (Map LineKey ByteString))
forall a. a -> IO (TVar a)
newTVarIO Map LineKey ByteString
forall k a. Map k a
Map.empty
  IO ThreadId -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ThreadId -> IO ()) -> (IO () -> IO ThreadId) -> IO () -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> IO ThreadId
forkIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
    OutChan ByteString -> Ingestor -> IO ()
forall {a} {b}. OutChan a -> Ingestor -> IO b
go OutChan ByteString
outChan Ingestor
ingestor
  Ingestor -> IO Ingestor
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Ingestor
ingestor
  where
    deltaT :: NominalDiffTime
    deltaT :: NominalDiffTime
deltaT = Int -> NominalDiffTime
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
millisecs NominalDiffTime -> NominalDiffTime -> NominalDiffTime
forall a. Fractional a => a -> a -> a
/ NominalDiffTime
1000

    flushChan :: OutChan a -> IO ()
flushChan OutChan a
outChan = do
      (Element IO (Maybe a)
hasNext, IO a
_) <- OutChan a -> IO (Element a, IO a)
forall a. OutChan a -> IO (Element a, IO a)
Unagi.tryReadChan OutChan a
outChan
      Maybe a
maybeNext <- IO (Maybe a)
hasNext
      Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Maybe a -> Bool
forall a. Maybe a -> Bool
isJust Maybe a
maybeNext) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
        OutChan a -> IO ()
flushChan OutChan a
outChan

    -- Process the in-buffer by removing everything older than the cutoff
    -- deltaT and writing it, sorted by timestamp, into the out-queue.
    go :: OutChan a -> Ingestor -> IO b
go OutChan a
outChan Ingestor{Int
TVar (Map LineKey ByteString)
InChan ByteString
ingRetentionMs :: Ingestor -> Int
ingChanRef :: Ingestor -> InChan ByteString
ingInBuffer :: Ingestor -> TVar (Map LineKey ByteString)
ingRetentionMs :: Int
ingChanRef :: InChan ByteString
ingInBuffer :: TVar (Map LineKey ByteString)
..} = IO () -> IO b
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO b) -> IO () -> IO b
forall a b. (a -> b) -> a -> b
$ do
      Int -> IO ()
threadDelay (Int -> IO ()) -> Int -> IO ()
forall a b. (a -> b) -> a -> b
$ Int
millisecs Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000
      NominalDiffTime
now <- IO NominalDiffTime
getPOSIXTime
      let cutoff :: UTCTime
cutoff = NominalDiffTime -> UTCTime
posixSecondsToUTCTime (NominalDiffTime -> UTCTime) -> NominalDiffTime -> UTCTime
forall a b. (a -> b) -> a -> b
$ NominalDiffTime
now NominalDiffTime -> NominalDiffTime -> NominalDiffTime
forall a. Num a => a -> a -> a
- NominalDiffTime
deltaT
      !Map LineKey ByteString
m <- STM (Map LineKey ByteString) -> IO (Map LineKey ByteString)
forall a. STM a -> IO a
atomically (STM (Map LineKey ByteString) -> IO (Map LineKey ByteString))
-> STM (Map LineKey ByteString) -> IO (Map LineKey ByteString)
forall a b. (a -> b) -> a -> b
$
        TVar (Map LineKey ByteString)
-> (Map LineKey ByteString
    -> (Map LineKey ByteString, Map LineKey ByteString))
-> STM (Map LineKey ByteString)
forall s a. TVar s -> (s -> (a, s)) -> STM a
stateTVar TVar (Map LineKey ByteString)
ingInBuffer ((Map LineKey ByteString
  -> (Map LineKey ByteString, Map LineKey ByteString))
 -> STM (Map LineKey ByteString))
-> (Map LineKey ByteString
    -> (Map LineKey ByteString, Map LineKey ByteString))
-> STM (Map LineKey ByteString)
forall a b. (a -> b) -> a -> b
$
          (LineKey -> Bool)
-> Map LineKey ByteString
-> (Map LineKey ByteString, Map LineKey ByteString)
forall k a. (k -> Bool) -> Map k a -> (Map k a, Map k a)
Map.spanAntitone ((UTCTime -> UTCTime -> Bool
forall a. Ord a => a -> a -> Bool
< UTCTime
cutoff) (UTCTime -> Bool) -> (LineKey -> UTCTime) -> LineKey -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. LineKey -> UTCTime
forall a b. (a, b) -> a
fst)
      InChan ByteString -> [ByteString] -> IO ()
forall a. InChan a -> [a] -> IO ()
Unagi.writeList2Chan InChan ByteString
ingChanRef ([ByteString] -> IO ()) -> [ByteString] -> IO ()
forall a b. (a -> b) -> a -> b
$
        (LineKey, ByteString) -> ByteString
forall a b. (a, b) -> b
snd ((LineKey, ByteString) -> ByteString)
-> [(LineKey, ByteString)] -> [ByteString]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Map LineKey ByteString -> [(LineKey, ByteString)]
forall k a. Map k a -> [(k, a)]
Map.toAscList Map LineKey ByteString
m
      -- We flush our own broadcast end of the channel immediately to not leak space
      OutChan a -> IO ()
forall {a}. OutChan a -> IO ()
flushChan OutChan a
outChan

-- For multiple / parallel consumers, each one requires its own IngestorReader
mkIngestorReader :: Ingestor -> IO IngestorReader
mkIngestorReader :: Ingestor -> IO IngestorReader
mkIngestorReader Ingestor{InChan ByteString
ingChanRef :: Ingestor -> InChan ByteString
ingChanRef :: InChan ByteString
ingChanRef} = do
  !OutChan ByteString
newOutChan <- InChan ByteString -> IO (OutChan ByteString)
forall a. InChan a -> IO (OutChan a)
Unagi.dupChan InChan ByteString
ingChanRef
  IngestorReader -> IO IngestorReader
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (IngestorReader -> IO IngestorReader)
-> IngestorReader -> IO IngestorReader
forall a b. (a -> b) -> a -> b
$ IO ByteString -> IngestorReader
IngestorReader (IO ByteString -> IngestorReader)
-> IO ByteString -> IngestorReader
forall a b. (a -> b) -> a -> b
$ OutChan ByteString -> IO ByteString
forall a. OutChan a -> IO a
Unagi.readChan OutChan ByteString
newOutChan

-- If the thread couldn't be started, there'll be a Just errormessage
ingestFileThreaded :: Ingestor -> FailureMode -> IngestMode -> FilePath -> IO (Maybe String)
ingestFileThreaded :: Ingestor
-> FailureMode -> IngestMode -> [Char] -> IO (Maybe [Char])
ingestFileThreaded Ingestor{TVar (Map LineKey ByteString)
ingInBuffer :: Ingestor -> TVar (Map LineKey ByteString)
ingInBuffer :: TVar (Map LineKey ByteString)
ingInBuffer, Int
ingRetentionMs :: Ingestor -> Int
ingRetentionMs :: Int
ingRetentionMs} FailureMode
failMode IngestMode
ingestMode [Char]
fp =
  [Char] -> IO Bool
doesFileExist [Char]
fp IO Bool -> (Bool -> IO (Maybe [Char])) -> IO (Maybe [Char])
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    Bool
False -> Maybe [Char] -> IO (Maybe [Char])
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe [Char] -> IO (Maybe [Char]))
-> Maybe [Char] -> IO (Maybe [Char])
forall a b. (a -> b) -> a -> b
$ [Char] -> Maybe [Char]
forall a. a -> Maybe a
Just ([Char] -> Maybe [Char]) -> [Char] -> Maybe [Char]
forall a b. (a -> b) -> a -> b
$ [Char]
"ingestFileThreaded: file not found: " [Char] -> ShowS
forall a. [a] -> [a] -> [a]
++ [Char]
fp
    Bool
True -> do
      IO ThreadId -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ThreadId -> IO ()) -> (IO () -> IO ThreadId) -> IO () -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> IO ThreadId
forkIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
        (SomeException -> IO ()) -> IO () -> IO ()
forall e a. Exception e => (e -> IO a) -> IO a -> IO a
handle (\(SomeException
ex :: SomeException) -> Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (FailureMode
failMode FailureMode -> FailureMode -> Bool
forall a. Eq a => a -> a -> Bool
== FailureMode
DieSilently) (SomeException -> IO ()
forall e a. Exception e => e -> IO a
throwIO SomeException
ex))
          IO ()
forall {r}. IO r
thread
      Maybe [Char] -> IO (Maybe [Char])
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe [Char]
forall a. Maybe a
Nothing
  where
    pollingDelay :: Int
pollingDelay = Int
1000 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
20 (Int
ingRetentionMs Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` Int
40)
    filePathHash :: Word64
filePathHash = [Char] -> Word64
djb2 [Char]
fp

    thread :: IO r
thread = [Char] -> IOMode -> (Handle -> IO r) -> IO r
forall r. [Char] -> IOMode -> (Handle -> IO r) -> IO r
withFile [Char]
fp IOMode
ReadMode ((Handle -> IO r) -> IO r) -> (Handle -> IO r) -> IO r
forall a b. (a -> b) -> a -> b
$ \Handle
hdl -> do
      Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (IngestMode
ingestMode IngestMode -> IngestMode -> Bool
forall a. Eq a => a -> a -> Bool
== IngestMode
FromFileEnd) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
        Handle -> SeekMode -> Integer -> IO ()
hSeek Handle
hdl SeekMode
SeekFromEnd Integer
0
      IO () -> IO r
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO r) -> IO () -> IO r
forall a b. (a -> b) -> a -> b
$ do
        -- This whole polling logic exists to avoid lazy I/O
        Int -> IO ()
threadDelay Int
pollingDelay
        Handle -> IO ()
ingestLines Handle
hdl

    ingestLines :: Handle -> IO ()
    ingestLines :: Handle -> IO ()
ingestLines Handle
hdl = IO ByteString -> IO (Either IOException ByteString)
forall e a. Exception e => IO a -> IO (Either e a)
try (Handle -> IO ByteString
ByteString.hGetLine Handle
hdl) IO (Either IOException ByteString)
-> (Either IOException ByteString -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case

      -- There's no requirement for each and every line to be a trace event.
      -- While with the Haskell node this is almost always the case, it can't be
      -- a general assumption.
      -- If the line doesn't look like a timestamped JSON object, we skip it.
      Right ByteString
line -> do
        Maybe Timestamp -> (Timestamp -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (ByteString -> Maybe Timestamp
forall a. FromJSON a => ByteString -> Maybe a
decodeStrict' ByteString
line) ((Timestamp -> IO ()) -> IO ()) -> (Timestamp -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(Timestamp UTCTime
ts) ->
          STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (Map LineKey ByteString)
-> (Map LineKey ByteString -> Map LineKey ByteString) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (Map LineKey ByteString)
ingInBuffer ((Map LineKey ByteString -> Map LineKey ByteString) -> STM ())
-> (Map LineKey ByteString -> Map LineKey ByteString) -> STM ()
forall a b. (a -> b) -> a -> b
$
            LineKey
-> ByteString -> Map LineKey ByteString -> Map LineKey ByteString
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert (UTCTime
ts, Word64
filePathHash) ByteString
line
        Handle -> IO ()
ingestLines Handle
hdl

      -- Swallow EOF while polling for new input, rethrow anything else
      Left (IOException
err :: IOException) | IOException -> Bool
isEOFError IOException
err -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
                                | Bool
otherwise      -> IOException -> IO ()
forall a. IOException -> IO a
ioError IOException
err

djb2 :: [Char] -> Word64
djb2 :: [Char] -> Word64
djb2 = (Word64 -> Char -> Word64) -> Word64 -> [Char] -> Word64
forall b a. (b -> a -> b) -> b -> [a] -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl' (\Word64
acc (Int -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Word64) -> (Char -> Int) -> Char -> Word64
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Char -> Int
ord -> Word64
i) -> ((Word64
acc Word64 -> Int -> Word64
forall a. Bits a => a -> Int -> a
`shiftL` Int
5) Word64 -> Word64 -> Word64
forall a. Num a => a -> a -> a
+ Word64
acc) Word64 -> Word64 -> Word64
forall a. Bits a => a -> a -> a
`xor` Word64
i) Word64
5381


{-
-- example usage:
_testRunConsumer :: IO ()
_testRunConsumer = do
  ing <- mkIngestor 1000
  onErrorPrint =<< ingestFileThreaded ing DieSilently FromFileStart "log-small.txt"
  onErrorPrint =<< ingestFileThreaded ing DieSilently FromFileStart "something-else.txt"

  ingReader <- mkIngestorReader ing
  forever $
    readLineIngestor ingReader >>= print
  where
    onErrorPrint :: Maybe String -> IO ()
    onErrorPrint = mapM_ putStrLn
-}