DetectorGraph  2.0
resuminggraph.cpp File Reference

Basic Counter Graph with persistent storage. More...

Go to the source code of this file.

Detailed Description

Basic Counter Graph with persistent storage.

Introduction

This examples cover the most basic way to preserve state across Graph instances.

This covers how to resume state and a neat way to define initial state.

Graph Architecture

The example uses a minimal Graph with a single Detector and two topics. The Detector counts EventHappened and publishes the total in EventCount.

ResumingCounter

State Persistence; Snapshots & ResumeFromSnapshotTopicState

This example shows how GraphStateStore and StateSnapshot can be used in conjunction with ResumeFromSnapshotTopicState to provide an extensible, robust and transparent state persistence mechanism.

The diagram below shows the life cycle of StateSnapshots during boot.

State Lifetime FSM

This life cycle can be seen in the main of this example:

308 int main()
309 {
310  DetectorGraph::StateSnapshot primeSnapshot = GetPrimeSnapshot();
311  DetectorGraph::StateSnapshot resumeSnapshot = ReadSnapshot(primeSnapshot);
312 
313  ResumingGraph resumingGraph = ResumingGraph(resumeSnapshot);
314  for (int i = 0; i < 7; ++i)
315  {
316  resumingGraph.ProcessData(EventHappened());
317  cout << "EventCount = " << resumingGraph.GetLastTopicState<EventCount>()->count << endl;
318 
319  const auto latestSnapshot = resumingGraph.GetLastState();
320  WriteSnapshot(*latestSnapshot);
321  }
322 }
The collection of TopicStates that represents the graph state so far.

PrimeSnapshot is a StateSnapshot that contains the initial state/data that should always be used - regardless of file-based state persistence. In this example we synthesize PrimeSnapshot this way:

255 DetectorGraph::StateSnapshot GetPrimeSnapshot()
256 {
257  std::list< ptr::shared_ptr<const DetectorGraph::TopicState> > topicStatesList;
258  topicStatesList.push_back(ptr::shared_ptr<const DetectorGraph::TopicState>(new EventCount(1000)));
259  // ... add any other TopicStates that need Prime states.
260  return DetectorGraph::StateSnapshot(topicStatesList);
261 }
The collection of TopicStates that represents the graph state so far.

ResumeSnapshot is the aggregation of TopicStates contained in PrimeSnapshot and what is deserialized from storage in ReadSnapshot():

286 DetectorGraph::StateSnapshot ReadSnapshot(const DetectorGraph::StateSnapshot& primeSnapshot)
287 {
288  std::list< ptr::shared_ptr<const DetectorGraph::TopicState> > topicStatesList;
289 
290  std::ifstream snapshotInStream(kSavedSnapshotFileName);
291  if (snapshotInStream.is_open())
292  {
293  // Generally here you'd have a loop through the TopicStates found on
294  // Storage and adding each one to `topicStatesList`
295  {
296  int count;
297  snapshotInStream >> count;
298  topicStatesList.push_back(ptr::shared_ptr<const DetectorGraph::TopicState>(new EventCount(count)));
299  }
300  // ... add deserialization of any other TopicStates of interest.
301  }
302 
303  return DetectorGraph::StateSnapshot(primeSnapshot, topicStatesList);
304 }
The collection of TopicStates that represents the graph state so far.

ResumeSnapshot is then used to construct a ResumeFromSnapshotTopicState which is then posted to the graph to allow Detectors to resume/initialize their state.

190  void Evaluate(const DetectorGraph::ResumeFromSnapshotTopicState& aResumeFrom)
191  {
192  const auto previousEventCount = aResumeFrom.snapshot.GetState<EventCount>();
193  if (previousEventCount)
194  {
195  mEventCount = *previousEventCount;
196  }
197  }
ptr::shared_ptr< const TopicState > GetState(TopicStateIdType aId) const
Returns a TopicState for a given Id.

From then on mStateStore in ResumingGraph is continually updated from within ResumingGraph::ProcessOutput():

229  mStateStore.TakeNewSnapshot(mGraph.GetOutputList());

The up-to-date latestSnapshot can then be flushed to disk as necessary with WriteSnapshot():

265 void WriteSnapshot(const DetectorGraph::StateSnapshot& aSnapshot)
266 {
267  std::ofstream snapshotOutStream(kSavedSnapshotFileName);
268 
269  // Generally here you'd have a loop through a list of TopicState
270  // serializers giving each one a chance to flush that topic's contents to
271  // disk. And you'd normally use something fancier for Serialization (e.g.
272  // protobufs).
273  // For the purposes of this demo we'll mock that in a single method:
274  {
275  ptr::shared_ptr<const EventCount> countPtr = aSnapshot.GetState<EventCount>();
276  if (countPtr)
277  {
278  snapshotOutStream << countPtr->count << endl;
279  }
280  // ... add serialization of any other TopicStates of interest.
281  }
282 }
ptr::shared_ptr< const TopicState > GetState(TopicStateIdType aId) const
Returns a TopicState for a given Id.
The collection of TopicStates that represents the graph state so far.
Note
Different applications may chose how to do this step differently. Depending on your chosen Serialization technology it can be too expensive to call WriteSnapshot() for each new processed input. Some applications may choose to only call WriteSnapshot() during shutdown. A more sophisticated approach is to call WriteSnapshot() only when a TopicState in a set of critical ones changes. This can be done by iterating through the list in mGraph.GetOutputList() as it only contains the changed TopicStates.

Running the Program

Running the program produces:

$ ./resuminggraph.out
DetectorGraph: Graph Initialized
EventCount = 1001
EventCount = 1002
EventCount = 1003
EventCount = 1004
EventCount = 1005
EventCount = 1006
EventCount = 1007

$ ./resuminggraph.out
DetectorGraph: Graph Initialized
EventCount = 1008
EventCount = 1009
EventCount = 1010
EventCount = 1011
EventCount = 1012
EventCount = 1013
EventCount = 1014

$ cat resumingGraphSnapshot.txt
1014

Definition in file resuminggraph.cpp.