{-# OPTIONS_GHC -Wno-orphans #-}

module Hermod.ReCon.Trace.Feed(TemporalEvent(..), TemporalEventDurationMicrosec, read, readS) where

import           Cardano.Logging.Types.TraceMessage
import           Hermod.ReCon.Trace.Ingest (IngestorReader (..))

import           Prelude hiding (read)

import           Data.Aeson (throwDecodeStrict)
import qualified Data.ByteString.Char8 as BChar8
import qualified Data.Foldable as Foldable
import           Data.List (sortOn)
import           Data.Sequence (Seq, (|>))
import           Data.Time.Clock (UTCTime)
import           Data.Time.Clock.POSIX (utcTimeToPOSIXSeconds)
import           Data.Word (Word64)

import           Streaming

utcToMicroseconds :: UTCTime -> Word64
utcToMicroseconds :: UTCTime -> Word64
utcToMicroseconds UTCTime
utcTime = POSIXTime -> Word64
forall b. Integral b => POSIXTime -> b
forall a b. (RealFrac a, Integral b) => a -> b
round (POSIXTime -> Word64) -> POSIXTime -> Word64
forall a b. (a -> b) -> a -> b
$ UTCTime -> POSIXTime
utcTimeToPOSIXSeconds UTCTime
utcTime POSIXTime -> POSIXTime -> POSIXTime
forall a. Num a => a -> a -> a
* POSIXTime
1000000

-- | Temporal event represents multiple trace messages spanning some duration of time together with an index of the event.
data TemporalEvent = TemporalEvent {
  -- | Microseconds since epoch when the event begins.
  TemporalEvent -> Word64
beg      :: Word64,
  TemporalEvent -> [TraceMessage]
messages :: [TraceMessage]
} deriving (Int -> TemporalEvent -> ShowS
[TemporalEvent] -> ShowS
TemporalEvent -> String
(Int -> TemporalEvent -> ShowS)
-> (TemporalEvent -> String)
-> ([TemporalEvent] -> ShowS)
-> Show TemporalEvent
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> TemporalEvent -> ShowS
showsPrec :: Int -> TemporalEvent -> ShowS
$cshow :: TemporalEvent -> String
show :: TemporalEvent -> String
$cshowList :: [TemporalEvent] -> ShowS
showList :: [TemporalEvent] -> ShowS
Show, TemporalEvent -> TemporalEvent -> Bool
(TemporalEvent -> TemporalEvent -> Bool)
-> (TemporalEvent -> TemporalEvent -> Bool) -> Eq TemporalEvent
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: TemporalEvent -> TemporalEvent -> Bool
== :: TemporalEvent -> TemporalEvent -> Bool
$c/= :: TemporalEvent -> TemporalEvent -> Bool
/= :: TemporalEvent -> TemporalEvent -> Bool
Eq, Eq TemporalEvent
Eq TemporalEvent =>
(TemporalEvent -> TemporalEvent -> Ordering)
-> (TemporalEvent -> TemporalEvent -> Bool)
-> (TemporalEvent -> TemporalEvent -> Bool)
-> (TemporalEvent -> TemporalEvent -> Bool)
-> (TemporalEvent -> TemporalEvent -> Bool)
-> (TemporalEvent -> TemporalEvent -> TemporalEvent)
-> (TemporalEvent -> TemporalEvent -> TemporalEvent)
-> Ord TemporalEvent
TemporalEvent -> TemporalEvent -> Bool
TemporalEvent -> TemporalEvent -> Ordering
TemporalEvent -> TemporalEvent -> TemporalEvent
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: TemporalEvent -> TemporalEvent -> Ordering
compare :: TemporalEvent -> TemporalEvent -> Ordering
$c< :: TemporalEvent -> TemporalEvent -> Bool
< :: TemporalEvent -> TemporalEvent -> Bool
$c<= :: TemporalEvent -> TemporalEvent -> Bool
<= :: TemporalEvent -> TemporalEvent -> Bool
$c> :: TemporalEvent -> TemporalEvent -> Bool
> :: TemporalEvent -> TemporalEvent -> Bool
$c>= :: TemporalEvent -> TemporalEvent -> Bool
>= :: TemporalEvent -> TemporalEvent -> Bool
$cmax :: TemporalEvent -> TemporalEvent -> TemporalEvent
max :: TemporalEvent -> TemporalEvent -> TemporalEvent
$cmin :: TemporalEvent -> TemporalEvent -> TemporalEvent
min :: TemporalEvent -> TemporalEvent -> TemporalEvent
Ord)

-- | For performance considerations we group trace messages within the specified duration in one `TemporalEvent`.
type TemporalEventDurationMicrosec = Word

-- | Fill in one temporal event.
--   Returns the event, the starting time boundary of the next temporal event and the rest of the messages.
fill :: TemporalEventDurationMicrosec -> Seq TraceMessage -> Word64 -> [TraceMessage] -> (TemporalEvent, Word64, [TraceMessage])
fill :: TemporalEventDurationMicrosec
-> Seq TraceMessage
-> Word64
-> [TraceMessage]
-> (TemporalEvent, Word64, [TraceMessage])
fill TemporalEventDurationMicrosec
duration Seq TraceMessage
acc Word64
t (TraceMessage
x : [TraceMessage]
xs) | UTCTime -> Word64
utcToMicroseconds TraceMessage
x.tmsgAt  Word64 -> Word64 -> Bool
forall a. Ord a => a -> a -> Bool
<= Word64
t Word64 -> Word64 -> Word64
forall a. Num a => a -> a -> a
+ TemporalEventDurationMicrosec -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral TemporalEventDurationMicrosec
duration = TemporalEventDurationMicrosec
-> Seq TraceMessage
-> Word64
-> [TraceMessage]
-> (TemporalEvent, Word64, [TraceMessage])
fill TemporalEventDurationMicrosec
duration (Seq TraceMessage
acc Seq TraceMessage -> TraceMessage -> Seq TraceMessage
forall a. Seq a -> a -> Seq a
|> TraceMessage
x) Word64
t [TraceMessage]
xs
fill TemporalEventDurationMicrosec
duration Seq TraceMessage
acc Word64
t [TraceMessage]
rest = (Word64 -> [TraceMessage] -> TemporalEvent
TemporalEvent Word64
t (Seq TraceMessage -> [TraceMessage]
forall a. Seq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
Foldable.toList Seq TraceMessage
acc), Word64
t Word64 -> Word64 -> Word64
forall a. Num a => a -> a -> a
+ TemporalEventDurationMicrosec -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral TemporalEventDurationMicrosec
duration, [TraceMessage]
rest)

-- | Slice up the trace messages into consequtive temporal events.
slice :: TemporalEventDurationMicrosec -> [TraceMessage] -> [TemporalEvent]
slice :: TemporalEventDurationMicrosec -> [TraceMessage] -> [TemporalEvent]
slice TemporalEventDurationMicrosec
_ [] = []
slice TemporalEventDurationMicrosec
duration msg_ :: [TraceMessage]
msg_@(TraceMessage
x : [TraceMessage]
_) = Word64 -> [TraceMessage] -> [TemporalEvent]
go (UTCTime -> Word64
utcToMicroseconds (TraceMessage -> UTCTime
tmsgAt TraceMessage
x)) [TraceMessage]
msg_ where
  go :: Word64 -> [TraceMessage] -> [TemporalEvent]
  go :: Word64 -> [TraceMessage] -> [TemporalEvent]
go Word64
_ [] = []
  go Word64
t [TraceMessage]
msg =
    let (TemporalEvent
e, !Word64
t', ![TraceMessage]
msg') = TemporalEventDurationMicrosec
-> Seq TraceMessage
-> Word64
-> [TraceMessage]
-> (TemporalEvent, Word64, [TraceMessage])
fill TemporalEventDurationMicrosec
duration Seq TraceMessage
forall a. Monoid a => a
mempty Word64
t [TraceMessage]
msg in
    TemporalEvent
e TemporalEvent -> [TemporalEvent] -> [TemporalEvent]
forall a. a -> [a] -> [a]
: Word64 -> [TraceMessage] -> [TemporalEvent]
go Word64
t' [TraceMessage]
msg'

-- | We assume its possible for the trace messages to come out of order. Remedy that here.
sortByTimestamp :: [TraceMessage] -> [TraceMessage]
sortByTimestamp :: [TraceMessage] -> [TraceMessage]
sortByTimestamp = (TraceMessage -> UTCTime) -> [TraceMessage] -> [TraceMessage]
forall b a. Ord b => (a -> b) -> [a] -> [a]
sortOn TraceMessage -> UTCTime
tmsgAt

-- | Read a text file where every line is a json object representation of a `TraceMessage`.
--   Trace messages lying within the specified `TemporalEventDurationMicrosec` are grouped in `TemporalEvent`.
--   The trace messages are sorted by timestamp before any action.
isCommentLine :: BChar8.ByteString -> Bool
isCommentLine :: ByteString -> Bool
isCommentLine ByteString
l = ByteString
"//" ByteString -> ByteString -> Bool
`BChar8.isPrefixOf` ByteString
l Bool -> Bool -> Bool
|| ByteString -> Bool
BChar8.null ByteString
l

read :: FilePath -> TemporalEventDurationMicrosec -> IO [TemporalEvent]
read :: String -> TemporalEventDurationMicrosec -> IO [TemporalEvent]
read String
filename TemporalEventDurationMicrosec
duration = do
  [ByteString]
traces <- (ByteString -> Bool) -> [ByteString] -> [ByteString]
forall a. (a -> Bool) -> [a] -> [a]
filter (Bool -> Bool
not (Bool -> Bool) -> (ByteString -> Bool) -> ByteString -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Bool
isCommentLine) ([ByteString] -> [ByteString])
-> (ByteString -> [ByteString]) -> ByteString -> [ByteString]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> [ByteString]
BChar8.lines (ByteString -> [ByteString]) -> IO ByteString -> IO [ByteString]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> String -> IO ByteString
BChar8.readFile String
filename
  [TraceMessage]
msgs <- [TraceMessage] -> [TraceMessage]
sortByTimestamp ([TraceMessage] -> [TraceMessage])
-> IO [TraceMessage] -> IO [TraceMessage]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (ByteString -> IO TraceMessage)
-> [ByteString] -> IO [TraceMessage]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> [a] -> f [b]
traverse ByteString -> IO TraceMessage
forall a (m :: * -> *).
(FromJSON a, MonadThrow m) =>
ByteString -> m a
throwDecodeStrict [ByteString]
traces
  let events :: [TemporalEvent]
events = TemporalEventDurationMicrosec -> [TraceMessage] -> [TemporalEvent]
slice TemporalEventDurationMicrosec
duration [TraceMessage]
msgs
  [TemporalEvent] -> IO [TemporalEvent]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure [TemporalEvent]
events

data TemporalEventBuilderSt = TemporalEventBuilderSt {
  -- | A message read from the file that hasn't been distributed yet (if any).
  TemporalEventBuilderSt -> Maybe TraceMessage
nextBuffered :: !(Maybe TraceMessage),
  -- | The timestamp of the beginning of the next issued temporal event.
  TemporalEventBuilderSt -> Word64
nextBeg      :: !Word64,
  -- | The accumulation of trace messages to be issued in the next issued temporal event.
  TemporalEventBuilderSt -> Seq TraceMessage
nextMsgs     :: !(Seq TraceMessage),
  -- | Whether the file of trace messages has ended.
  TemporalEventBuilderSt -> Bool
nextTerminal :: !Bool
}

readS :: IngestorReader -> TemporalEventDurationMicrosec -> Stream (Of TemporalEvent) IO ()
readS :: IngestorReader
-> TemporalEventDurationMicrosec -> Stream (Of TemporalEvent) IO ()
readS IngestorReader
ingestor TemporalEventDurationMicrosec
duration = do
  TraceMessage
firstMsg <- IO TraceMessage -> Stream (Of TemporalEvent) IO TraceMessage
forall (m :: * -> *) a.
Monad m =>
m a -> Stream (Of TemporalEvent) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (IngestorReader
ingestor.readLineIngestor IO ByteString -> (ByteString -> IO TraceMessage) -> IO TraceMessage
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ByteString -> IO TraceMessage
forall a (m :: * -> *).
(FromJSON a, MonadThrow m) =>
ByteString -> m a
throwDecodeStrict)
  (TemporalEventBuilderSt
 -> IO (Either () (Of TemporalEvent TemporalEventBuilderSt)))
-> TemporalEventBuilderSt -> Stream (Of TemporalEvent) IO ()
forall (m :: * -> *) (f :: * -> *) s r.
(Monad m, Functor f) =>
(s -> m (Either r (f s))) -> s -> Stream f m r
unfold TemporalEventBuilderSt
-> IO (Either () (Of TemporalEvent TemporalEventBuilderSt))
go (TemporalEventBuilderSt -> Stream (Of TemporalEvent) IO ())
-> TemporalEventBuilderSt -> Stream (Of TemporalEvent) IO ()
forall a b. (a -> b) -> a -> b
$
   TemporalEventBuilderSt
     { nextBuffered :: Maybe TraceMessage
nextBuffered = TraceMessage -> Maybe TraceMessage
forall a. a -> Maybe a
Just TraceMessage
firstMsg
     , nextBeg :: Word64
nextBeg = UTCTime -> Word64
utcToMicroseconds TraceMessage
firstMsg.tmsgAt
     , nextMsgs :: Seq TraceMessage
nextMsgs = Seq TraceMessage
forall a. Monoid a => a
mempty
     , nextTerminal :: Bool
nextTerminal = Bool
False
     }
  where
    go :: TemporalEventBuilderSt
       -> IO (Either () (Of TemporalEvent TemporalEventBuilderSt))
    go :: TemporalEventBuilderSt
-> IO (Either () (Of TemporalEvent TemporalEventBuilderSt))
go TemporalEventBuilderSt{nextTerminal :: TemporalEventBuilderSt -> Bool
nextTerminal = Bool
True} = Either () (Of TemporalEvent TemporalEventBuilderSt)
-> IO (Either () (Of TemporalEvent TemporalEventBuilderSt))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (() -> Either () (Of TemporalEvent TemporalEventBuilderSt)
forall a b. a -> Either a b
Left ())
    go TemporalEventBuilderSt{nextBuffered :: TemporalEventBuilderSt -> Maybe TraceMessage
nextBuffered = Maybe TraceMessage
Nothing, Bool
Word64
Seq TraceMessage
nextBeg :: TemporalEventBuilderSt -> Word64
nextMsgs :: TemporalEventBuilderSt -> Seq TraceMessage
nextTerminal :: TemporalEventBuilderSt -> Bool
nextBeg :: Word64
nextMsgs :: Seq TraceMessage
nextTerminal :: Bool
..} = do
      TraceMessage
msg <- IngestorReader -> IO ByteString
readLineIngestor IngestorReader
ingestor IO ByteString -> (ByteString -> IO TraceMessage) -> IO TraceMessage
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ByteString -> IO TraceMessage
forall a (m :: * -> *).
(FromJSON a, MonadThrow m) =>
ByteString -> m a
throwDecodeStrict
      TemporalEventBuilderSt
-> IO (Either () (Of TemporalEvent TemporalEventBuilderSt))
go (Maybe TraceMessage
-> Word64 -> Seq TraceMessage -> Bool -> TemporalEventBuilderSt
TemporalEventBuilderSt (TraceMessage -> Maybe TraceMessage
forall a. a -> Maybe a
Just TraceMessage
msg) Word64
nextBeg Seq TraceMessage
nextMsgs Bool
False)
    go TemporalEventBuilderSt{nextBuffered :: TemporalEventBuilderSt -> Maybe TraceMessage
nextBuffered = Just TraceMessage
msg, Bool
Word64
Seq TraceMessage
nextBeg :: TemporalEventBuilderSt -> Word64
nextMsgs :: TemporalEventBuilderSt -> Seq TraceMessage
nextTerminal :: TemporalEventBuilderSt -> Bool
nextBeg :: Word64
nextMsgs :: Seq TraceMessage
nextTerminal :: Bool
..} | UTCTime -> Word64
utcToMicroseconds TraceMessage
msg.tmsgAt Word64 -> Word64 -> Bool
forall a. Ord a => a -> a -> Bool
<= Word64
nextBeg Word64 -> Word64 -> Word64
forall a. Num a => a -> a -> a
+ TemporalEventDurationMicrosec -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral TemporalEventDurationMicrosec
duration =
        TemporalEventBuilderSt
-> IO (Either () (Of TemporalEvent TemporalEventBuilderSt))
go (Maybe TraceMessage
-> Word64 -> Seq TraceMessage -> Bool -> TemporalEventBuilderSt
TemporalEventBuilderSt Maybe TraceMessage
forall a. Maybe a
Nothing Word64
nextBeg (Seq TraceMessage
nextMsgs Seq TraceMessage -> TraceMessage -> Seq TraceMessage
forall a. Seq a -> a -> Seq a
|> TraceMessage
msg) Bool
False)
    go TemporalEventBuilderSt{nextBuffered :: TemporalEventBuilderSt -> Maybe TraceMessage
nextBuffered = Just TraceMessage
msg, Bool
Word64
Seq TraceMessage
nextBeg :: TemporalEventBuilderSt -> Word64
nextMsgs :: TemporalEventBuilderSt -> Seq TraceMessage
nextTerminal :: TemporalEventBuilderSt -> Bool
nextBeg :: Word64
nextMsgs :: Seq TraceMessage
nextTerminal :: Bool
..} = Either () (Of TemporalEvent TemporalEventBuilderSt)
-> IO (Either () (Of TemporalEvent TemporalEventBuilderSt))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either () (Of TemporalEvent TemporalEventBuilderSt)
 -> IO (Either () (Of TemporalEvent TemporalEventBuilderSt)))
-> Either () (Of TemporalEvent TemporalEventBuilderSt)
-> IO (Either () (Of TemporalEvent TemporalEventBuilderSt))
forall a b. (a -> b) -> a -> b
$ Of TemporalEvent TemporalEventBuilderSt
-> Either () (Of TemporalEvent TemporalEventBuilderSt)
forall a b. b -> Either a b
Right (Of TemporalEvent TemporalEventBuilderSt
 -> Either () (Of TemporalEvent TemporalEventBuilderSt))
-> Of TemporalEvent TemporalEventBuilderSt
-> Either () (Of TemporalEvent TemporalEventBuilderSt)
forall a b. (a -> b) -> a -> b
$
      Word64 -> [TraceMessage] -> TemporalEvent
TemporalEvent Word64
nextBeg (Seq TraceMessage -> [TraceMessage]
forall a. Seq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
Foldable.toList Seq TraceMessage
nextMsgs)
        TemporalEvent
-> TemporalEventBuilderSt
-> Of TemporalEvent TemporalEventBuilderSt
forall a b. a -> b -> Of a b
:>
      Maybe TraceMessage
-> Word64 -> Seq TraceMessage -> Bool -> TemporalEventBuilderSt
TemporalEventBuilderSt (TraceMessage -> Maybe TraceMessage
forall a. a -> Maybe a
Just TraceMessage
msg) (Word64
nextBeg Word64 -> Word64 -> Word64
forall a. Num a => a -> a -> a
+ TemporalEventDurationMicrosec -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral TemporalEventDurationMicrosec
duration) Seq TraceMessage
forall a. Monoid a => a
mempty Bool
False