{-# OPTIONS_GHC -Wno-orphans #-}
module Hermod.ReCon.Trace.Feed(TemporalEvent(..), TemporalEventDurationMicrosec, read, readS) where
import Cardano.Logging.Types.TraceMessage
import Hermod.ReCon.Trace.Ingest (IngestorReader (..))
import Prelude hiding (read)
import Data.Aeson (throwDecodeStrict)
import qualified Data.ByteString.Char8 as BChar8
import qualified Data.Foldable as Foldable
import Data.List (sortOn)
import Data.Sequence (Seq, (|>))
import Data.Time.Clock (UTCTime)
import Data.Time.Clock.POSIX (utcTimeToPOSIXSeconds)
import Data.Word (Word64)
import Streaming
utcToMicroseconds :: UTCTime -> Word64
utcToMicroseconds :: UTCTime -> Word64
utcToMicroseconds UTCTime
utcTime = POSIXTime -> Word64
forall b. Integral b => POSIXTime -> b
forall a b. (RealFrac a, Integral b) => a -> b
round (POSIXTime -> Word64) -> POSIXTime -> Word64
forall a b. (a -> b) -> a -> b
$ UTCTime -> POSIXTime
utcTimeToPOSIXSeconds UTCTime
utcTime POSIXTime -> POSIXTime -> POSIXTime
forall a. Num a => a -> a -> a
* POSIXTime
1000000
data TemporalEvent = TemporalEvent {
TemporalEvent -> Word64
beg :: Word64,
TemporalEvent -> [TraceMessage]
messages :: [TraceMessage]
} deriving (Int -> TemporalEvent -> ShowS
[TemporalEvent] -> ShowS
TemporalEvent -> String
(Int -> TemporalEvent -> ShowS)
-> (TemporalEvent -> String)
-> ([TemporalEvent] -> ShowS)
-> Show TemporalEvent
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> TemporalEvent -> ShowS
showsPrec :: Int -> TemporalEvent -> ShowS
$cshow :: TemporalEvent -> String
show :: TemporalEvent -> String
$cshowList :: [TemporalEvent] -> ShowS
showList :: [TemporalEvent] -> ShowS
Show, TemporalEvent -> TemporalEvent -> Bool
(TemporalEvent -> TemporalEvent -> Bool)
-> (TemporalEvent -> TemporalEvent -> Bool) -> Eq TemporalEvent
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: TemporalEvent -> TemporalEvent -> Bool
== :: TemporalEvent -> TemporalEvent -> Bool
$c/= :: TemporalEvent -> TemporalEvent -> Bool
/= :: TemporalEvent -> TemporalEvent -> Bool
Eq, Eq TemporalEvent
Eq TemporalEvent =>
(TemporalEvent -> TemporalEvent -> Ordering)
-> (TemporalEvent -> TemporalEvent -> Bool)
-> (TemporalEvent -> TemporalEvent -> Bool)
-> (TemporalEvent -> TemporalEvent -> Bool)
-> (TemporalEvent -> TemporalEvent -> Bool)
-> (TemporalEvent -> TemporalEvent -> TemporalEvent)
-> (TemporalEvent -> TemporalEvent -> TemporalEvent)
-> Ord TemporalEvent
TemporalEvent -> TemporalEvent -> Bool
TemporalEvent -> TemporalEvent -> Ordering
TemporalEvent -> TemporalEvent -> TemporalEvent
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
$ccompare :: TemporalEvent -> TemporalEvent -> Ordering
compare :: TemporalEvent -> TemporalEvent -> Ordering
$c< :: TemporalEvent -> TemporalEvent -> Bool
< :: TemporalEvent -> TemporalEvent -> Bool
$c<= :: TemporalEvent -> TemporalEvent -> Bool
<= :: TemporalEvent -> TemporalEvent -> Bool
$c> :: TemporalEvent -> TemporalEvent -> Bool
> :: TemporalEvent -> TemporalEvent -> Bool
$c>= :: TemporalEvent -> TemporalEvent -> Bool
>= :: TemporalEvent -> TemporalEvent -> Bool
$cmax :: TemporalEvent -> TemporalEvent -> TemporalEvent
max :: TemporalEvent -> TemporalEvent -> TemporalEvent
$cmin :: TemporalEvent -> TemporalEvent -> TemporalEvent
min :: TemporalEvent -> TemporalEvent -> TemporalEvent
Ord)
type TemporalEventDurationMicrosec = Word
fill :: TemporalEventDurationMicrosec -> Seq TraceMessage -> Word64 -> [TraceMessage] -> (TemporalEvent, Word64, [TraceMessage])
fill :: TemporalEventDurationMicrosec
-> Seq TraceMessage
-> Word64
-> [TraceMessage]
-> (TemporalEvent, Word64, [TraceMessage])
fill TemporalEventDurationMicrosec
duration Seq TraceMessage
acc Word64
t (TraceMessage
x : [TraceMessage]
xs) | UTCTime -> Word64
utcToMicroseconds TraceMessage
x.tmsgAt Word64 -> Word64 -> Bool
forall a. Ord a => a -> a -> Bool
<= Word64
t Word64 -> Word64 -> Word64
forall a. Num a => a -> a -> a
+ TemporalEventDurationMicrosec -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral TemporalEventDurationMicrosec
duration = TemporalEventDurationMicrosec
-> Seq TraceMessage
-> Word64
-> [TraceMessage]
-> (TemporalEvent, Word64, [TraceMessage])
fill TemporalEventDurationMicrosec
duration (Seq TraceMessage
acc Seq TraceMessage -> TraceMessage -> Seq TraceMessage
forall a. Seq a -> a -> Seq a
|> TraceMessage
x) Word64
t [TraceMessage]
xs
fill TemporalEventDurationMicrosec
duration Seq TraceMessage
acc Word64
t [TraceMessage]
rest = (Word64 -> [TraceMessage] -> TemporalEvent
TemporalEvent Word64
t (Seq TraceMessage -> [TraceMessage]
forall a. Seq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
Foldable.toList Seq TraceMessage
acc), Word64
t Word64 -> Word64 -> Word64
forall a. Num a => a -> a -> a
+ TemporalEventDurationMicrosec -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral TemporalEventDurationMicrosec
duration, [TraceMessage]
rest)
slice :: TemporalEventDurationMicrosec -> [TraceMessage] -> [TemporalEvent]
slice :: TemporalEventDurationMicrosec -> [TraceMessage] -> [TemporalEvent]
slice TemporalEventDurationMicrosec
_ [] = []
slice TemporalEventDurationMicrosec
duration msg_ :: [TraceMessage]
msg_@(TraceMessage
x : [TraceMessage]
_) = Word64 -> [TraceMessage] -> [TemporalEvent]
go (UTCTime -> Word64
utcToMicroseconds (TraceMessage -> UTCTime
tmsgAt TraceMessage
x)) [TraceMessage]
msg_ where
go :: Word64 -> [TraceMessage] -> [TemporalEvent]
go :: Word64 -> [TraceMessage] -> [TemporalEvent]
go Word64
_ [] = []
go Word64
t [TraceMessage]
msg =
let (TemporalEvent
e, !Word64
t', ![TraceMessage]
msg') = TemporalEventDurationMicrosec
-> Seq TraceMessage
-> Word64
-> [TraceMessage]
-> (TemporalEvent, Word64, [TraceMessage])
fill TemporalEventDurationMicrosec
duration Seq TraceMessage
forall a. Monoid a => a
mempty Word64
t [TraceMessage]
msg in
TemporalEvent
e TemporalEvent -> [TemporalEvent] -> [TemporalEvent]
forall a. a -> [a] -> [a]
: Word64 -> [TraceMessage] -> [TemporalEvent]
go Word64
t' [TraceMessage]
msg'
sortByTimestamp :: [TraceMessage] -> [TraceMessage]
sortByTimestamp :: [TraceMessage] -> [TraceMessage]
sortByTimestamp = (TraceMessage -> UTCTime) -> [TraceMessage] -> [TraceMessage]
forall b a. Ord b => (a -> b) -> [a] -> [a]
sortOn TraceMessage -> UTCTime
tmsgAt
isCommentLine :: BChar8.ByteString -> Bool
ByteString
l = ByteString
"//" ByteString -> ByteString -> Bool
`BChar8.isPrefixOf` ByteString
l Bool -> Bool -> Bool
|| ByteString -> Bool
BChar8.null ByteString
l
read :: FilePath -> TemporalEventDurationMicrosec -> IO [TemporalEvent]
read :: String -> TemporalEventDurationMicrosec -> IO [TemporalEvent]
read String
filename TemporalEventDurationMicrosec
duration = do
[ByteString]
traces <- (ByteString -> Bool) -> [ByteString] -> [ByteString]
forall a. (a -> Bool) -> [a] -> [a]
filter (Bool -> Bool
not (Bool -> Bool) -> (ByteString -> Bool) -> ByteString -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Bool
isCommentLine) ([ByteString] -> [ByteString])
-> (ByteString -> [ByteString]) -> ByteString -> [ByteString]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> [ByteString]
BChar8.lines (ByteString -> [ByteString]) -> IO ByteString -> IO [ByteString]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> String -> IO ByteString
BChar8.readFile String
filename
[TraceMessage]
msgs <- [TraceMessage] -> [TraceMessage]
sortByTimestamp ([TraceMessage] -> [TraceMessage])
-> IO [TraceMessage] -> IO [TraceMessage]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (ByteString -> IO TraceMessage)
-> [ByteString] -> IO [TraceMessage]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> [a] -> f [b]
traverse ByteString -> IO TraceMessage
forall a (m :: * -> *).
(FromJSON a, MonadThrow m) =>
ByteString -> m a
throwDecodeStrict [ByteString]
traces
let events :: [TemporalEvent]
events = TemporalEventDurationMicrosec -> [TraceMessage] -> [TemporalEvent]
slice TemporalEventDurationMicrosec
duration [TraceMessage]
msgs
[TemporalEvent] -> IO [TemporalEvent]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure [TemporalEvent]
events
data TemporalEventBuilderSt = TemporalEventBuilderSt {
TemporalEventBuilderSt -> Maybe TraceMessage
nextBuffered :: !(Maybe TraceMessage),
TemporalEventBuilderSt -> Word64
nextBeg :: !Word64,
TemporalEventBuilderSt -> Seq TraceMessage
nextMsgs :: !(Seq TraceMessage),
TemporalEventBuilderSt -> Bool
nextTerminal :: !Bool
}
readS :: IngestorReader -> TemporalEventDurationMicrosec -> Stream (Of TemporalEvent) IO ()
readS :: IngestorReader
-> TemporalEventDurationMicrosec -> Stream (Of TemporalEvent) IO ()
readS IngestorReader
ingestor TemporalEventDurationMicrosec
duration = do
TraceMessage
firstMsg <- IO TraceMessage -> Stream (Of TemporalEvent) IO TraceMessage
forall (m :: * -> *) a.
Monad m =>
m a -> Stream (Of TemporalEvent) m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (IngestorReader
ingestor.readLineIngestor IO ByteString -> (ByteString -> IO TraceMessage) -> IO TraceMessage
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ByteString -> IO TraceMessage
forall a (m :: * -> *).
(FromJSON a, MonadThrow m) =>
ByteString -> m a
throwDecodeStrict)
(TemporalEventBuilderSt
-> IO (Either () (Of TemporalEvent TemporalEventBuilderSt)))
-> TemporalEventBuilderSt -> Stream (Of TemporalEvent) IO ()
forall (m :: * -> *) (f :: * -> *) s r.
(Monad m, Functor f) =>
(s -> m (Either r (f s))) -> s -> Stream f m r
unfold TemporalEventBuilderSt
-> IO (Either () (Of TemporalEvent TemporalEventBuilderSt))
go (TemporalEventBuilderSt -> Stream (Of TemporalEvent) IO ())
-> TemporalEventBuilderSt -> Stream (Of TemporalEvent) IO ()
forall a b. (a -> b) -> a -> b
$
TemporalEventBuilderSt
{ nextBuffered :: Maybe TraceMessage
nextBuffered = TraceMessage -> Maybe TraceMessage
forall a. a -> Maybe a
Just TraceMessage
firstMsg
, nextBeg :: Word64
nextBeg = UTCTime -> Word64
utcToMicroseconds TraceMessage
firstMsg.tmsgAt
, nextMsgs :: Seq TraceMessage
nextMsgs = Seq TraceMessage
forall a. Monoid a => a
mempty
, nextTerminal :: Bool
nextTerminal = Bool
False
}
where
go :: TemporalEventBuilderSt
-> IO (Either () (Of TemporalEvent TemporalEventBuilderSt))
go :: TemporalEventBuilderSt
-> IO (Either () (Of TemporalEvent TemporalEventBuilderSt))
go TemporalEventBuilderSt{nextTerminal :: TemporalEventBuilderSt -> Bool
nextTerminal = Bool
True} = Either () (Of TemporalEvent TemporalEventBuilderSt)
-> IO (Either () (Of TemporalEvent TemporalEventBuilderSt))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (() -> Either () (Of TemporalEvent TemporalEventBuilderSt)
forall a b. a -> Either a b
Left ())
go TemporalEventBuilderSt{nextBuffered :: TemporalEventBuilderSt -> Maybe TraceMessage
nextBuffered = Maybe TraceMessage
Nothing, Bool
Word64
Seq TraceMessage
nextBeg :: TemporalEventBuilderSt -> Word64
nextMsgs :: TemporalEventBuilderSt -> Seq TraceMessage
nextTerminal :: TemporalEventBuilderSt -> Bool
nextBeg :: Word64
nextMsgs :: Seq TraceMessage
nextTerminal :: Bool
..} = do
TraceMessage
msg <- IngestorReader -> IO ByteString
readLineIngestor IngestorReader
ingestor IO ByteString -> (ByteString -> IO TraceMessage) -> IO TraceMessage
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ByteString -> IO TraceMessage
forall a (m :: * -> *).
(FromJSON a, MonadThrow m) =>
ByteString -> m a
throwDecodeStrict
TemporalEventBuilderSt
-> IO (Either () (Of TemporalEvent TemporalEventBuilderSt))
go (Maybe TraceMessage
-> Word64 -> Seq TraceMessage -> Bool -> TemporalEventBuilderSt
TemporalEventBuilderSt (TraceMessage -> Maybe TraceMessage
forall a. a -> Maybe a
Just TraceMessage
msg) Word64
nextBeg Seq TraceMessage
nextMsgs Bool
False)
go TemporalEventBuilderSt{nextBuffered :: TemporalEventBuilderSt -> Maybe TraceMessage
nextBuffered = Just TraceMessage
msg, Bool
Word64
Seq TraceMessage
nextBeg :: TemporalEventBuilderSt -> Word64
nextMsgs :: TemporalEventBuilderSt -> Seq TraceMessage
nextTerminal :: TemporalEventBuilderSt -> Bool
nextBeg :: Word64
nextMsgs :: Seq TraceMessage
nextTerminal :: Bool
..} | UTCTime -> Word64
utcToMicroseconds TraceMessage
msg.tmsgAt Word64 -> Word64 -> Bool
forall a. Ord a => a -> a -> Bool
<= Word64
nextBeg Word64 -> Word64 -> Word64
forall a. Num a => a -> a -> a
+ TemporalEventDurationMicrosec -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral TemporalEventDurationMicrosec
duration =
TemporalEventBuilderSt
-> IO (Either () (Of TemporalEvent TemporalEventBuilderSt))
go (Maybe TraceMessage
-> Word64 -> Seq TraceMessage -> Bool -> TemporalEventBuilderSt
TemporalEventBuilderSt Maybe TraceMessage
forall a. Maybe a
Nothing Word64
nextBeg (Seq TraceMessage
nextMsgs Seq TraceMessage -> TraceMessage -> Seq TraceMessage
forall a. Seq a -> a -> Seq a
|> TraceMessage
msg) Bool
False)
go TemporalEventBuilderSt{nextBuffered :: TemporalEventBuilderSt -> Maybe TraceMessage
nextBuffered = Just TraceMessage
msg, Bool
Word64
Seq TraceMessage
nextBeg :: TemporalEventBuilderSt -> Word64
nextMsgs :: TemporalEventBuilderSt -> Seq TraceMessage
nextTerminal :: TemporalEventBuilderSt -> Bool
nextBeg :: Word64
nextMsgs :: Seq TraceMessage
nextTerminal :: Bool
..} = Either () (Of TemporalEvent TemporalEventBuilderSt)
-> IO (Either () (Of TemporalEvent TemporalEventBuilderSt))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Either () (Of TemporalEvent TemporalEventBuilderSt)
-> IO (Either () (Of TemporalEvent TemporalEventBuilderSt)))
-> Either () (Of TemporalEvent TemporalEventBuilderSt)
-> IO (Either () (Of TemporalEvent TemporalEventBuilderSt))
forall a b. (a -> b) -> a -> b
$ Of TemporalEvent TemporalEventBuilderSt
-> Either () (Of TemporalEvent TemporalEventBuilderSt)
forall a b. b -> Either a b
Right (Of TemporalEvent TemporalEventBuilderSt
-> Either () (Of TemporalEvent TemporalEventBuilderSt))
-> Of TemporalEvent TemporalEventBuilderSt
-> Either () (Of TemporalEvent TemporalEventBuilderSt)
forall a b. (a -> b) -> a -> b
$
Word64 -> [TraceMessage] -> TemporalEvent
TemporalEvent Word64
nextBeg (Seq TraceMessage -> [TraceMessage]
forall a. Seq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
Foldable.toList Seq TraceMessage
nextMsgs)
TemporalEvent
-> TemporalEventBuilderSt
-> Of TemporalEvent TemporalEventBuilderSt
forall a b. a -> b -> Of a b
:>
Maybe TraceMessage
-> Word64 -> Seq TraceMessage -> Bool -> TemporalEventBuilderSt
TemporalEventBuilderSt (TraceMessage -> Maybe TraceMessage
forall a. a -> Maybe a
Just TraceMessage
msg) (Word64
nextBeg Word64 -> Word64 -> Word64
forall a. Num a => a -> a -> a
+ TemporalEventDurationMicrosec -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral TemporalEventDurationMicrosec
duration) Seq TraceMessage
forall a. Monoid a => a
mempty Bool
False