DetectorGraph  2.0
resuminggraph.cpp
Go to the documentation of this file.
1 // Copyright 2017 Nest Labs, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "graph.hpp"
16 #include "detector.hpp"
17 #include "futurepublisher.hpp"
18 #include "sharedptr.hpp"
19 #include "processorcontainer.hpp"
20 #include "graphstatestore.hpp"
22 
23 #include <iostream>
24 #include <fstream>
25 
26 using std::cout;
27 using std::endl;
28 
29 /**
30  * @file resuminggraph.cpp
31  * @brief Basic Counter Graph with persistent storage.
32  *
33  * @section ex-rg-intro Introduction
34  * This examples cover the most basic way to preserve state across Graph
35  * instances.
36  *
37  * This covers how to resume state and a neat way to define initial state.
38  *
39  * @section ex-rg-arch Graph Architecture
40  *
41  * The example uses a minimal Graph with a single Detector and two topics.
42  * The Detector counts `EventHappened` and publishes the total in `EventCount`.
43  *
44  * @dot "ResumingCounter"
45 digraph GraphAnalyzer {
46  rankdir = "LR";
47  node[fontname=Helvetica];
48  size="12,5";
49  "EventHappened" [label="0:EventHappened",style=filled, shape=box, color=lightblue];
50  "EventHappened" -> "EventCounter";
51  "EventCounter" -> "EventCount";
52  "EventCounter" [label="1:EventCounter", color=blue];
53  "EventCount" [label="2:EventCount",style=filled, shape=box, color=red];
54 }
55  * @enddot
56  *
57  * @section ex-rg-state-persistence State Persistence; Snapshots & ResumeFromSnapshotTopicState
58  *
59  * This example shows how [GraphStateStore](@ref DetectorGraph::GraphStateStore
60  * ) and [StateSnapshot](@ref DetectorGraph::StateSnapshot) can be used in
61  * conjunction with [ResumeFromSnapshotTopicState](@ref DetectorGraph::ResumeFromSnapshotTopicState)
62  * to provide an extensible, robust and transparent state persistence
63  * mechanism.
64  *
65  * The diagram below shows the life cycle of StateSnapshots during boot.
66  * @dot "State Lifetime FSM"
67  digraph StateLifetime {
68  node[fontname=Helvetica];
69  edge[lp="r", fontname="Times-Italic", fontcolor=blue];
70  "first_ever" [label="", shape=none];
71  "first_ever" -> "PrimeSnapshot" [label="Start"];
72  "PrimeSnapshot" [fontname=Monospace];
73  "ResumeSnapshot" [fontname=Monospace];
74  "PrimeSnapshot" -> "ResumeSnapshot" [label="Reads from Storage"];
75  "ResumeSnapshot" -> "LatestSnapshot" [label="First Graph Evaluation"];
76  subgraph clusterrunning {
77  style=filled;
78  color=lightskyblue;
79  labeljust="r";
80  label="Running & Updating GraphStateStore";
81  "LatestSnapshot" [fontname=Monospace];
82  "LatestSnapshot" -> "LatestSnapshot" [label="Subsequent Evaluations\n+\n Write to Storage"];
83  }
84  }
85  * @enddot
86  *
87  * This life cycle can be seen in the `main` of this example:
88  @snippetlineno resuminggraph.cpp main
89  *
90  * `PrimeSnapshot` is a StateSnapshot that contains the initial state/data that
91  * should always be used - regardless of file-based state persistence.
92  * In this example we synthesize `PrimeSnapshot` this way:
93  @snippetlineno resuminggraph.cpp PrimeSnapshot
94 
95  * `ResumeSnapshot` is the aggregation of TopicStates contained in
96  * `PrimeSnapshot` and what is deserialized from storage in `ReadSnapshot()`:
97  @snippetlineno resuminggraph.cpp Deserialization
98 
99  * `ResumeSnapshot` is then used to construct a `ResumeFromSnapshotTopicState`
100  * which is then posted to the graph to allow Detectors to resume/initialize
101  * their state.
102  @snippetlineno resuminggraph.cpp Evaluate-ResumeFromSnapshot
103 
104  * From then on `mStateStore` in `ResumingGraph` is continually updated
105  * from within `ResumingGraph::ProcessOutput()`:
106  @snippetlineno resuminggraph.cpp UpdateStateStore
107 
108  * The up-to-date `latestSnapshot` can then be flushed to disk as necessary
109  * with `WriteSnapshot()`:
110  @snippetlineno resuminggraph.cpp Serialization
111 
112  @note
113 Different applications may chose how to do this step differently. Depending on
114 your chosen Serialization technology it can be too expensive to call
115 `WriteSnapshot()` for each new processed input. Some applications may choose
116 to only call `WriteSnapshot()` during shutdown. A more sophisticated approach
117 is to call `WriteSnapshot()` only when a `TopicState` in a set of _critical_
118 ones changes. This can be done by iterating through the list in
119 `mGraph.GetOutputList()` as it only contains the changed `TopicStates`.
120 
121  *
122  * @section ex-rg-instance Running the Program
123  * Running the program produces:
124  \verbatim
125 $ ./resuminggraph.out
126 DetectorGraph: Graph Initialized
127 EventCount = 1001
128 EventCount = 1002
129 EventCount = 1003
130 EventCount = 1004
131 EventCount = 1005
132 EventCount = 1006
133 EventCount = 1007
134 
135 $ ./resuminggraph.out
136 DetectorGraph: Graph Initialized
137 EventCount = 1008
138 EventCount = 1009
139 EventCount = 1010
140 EventCount = 1011
141 EventCount = 1012
142 EventCount = 1013
143 EventCount = 1014
144 
145 $ cat resumingGraphSnapshot.txt
146 1014
147  \endverbatim
148  *
149  *
150  * @cond DO_NOT_DOCUMENT
151  */
152 
153 enum class ResumingGraphTopicStateIds{
154  kEventCount = 0,
155 };
156 
157 struct EventHappened : public DetectorGraph::TopicState
158 {
159 };
160 
161 //! [NamedEventCount]
162 struct EventCount : public DetectorGraph::TopicState
163 {
164  EventCount(int aCount = 0) : count(aCount) {}
165  int count;
166 
167  DetectorGraph::TopicStateIdType GetId() const
168  {
169  return static_cast<DetectorGraph::TopicStateIdType>(
170  ResumingGraphTopicStateIds::kEventCount);
171  }
172 };
173 //! [NamedEventCount]
174 
175 //![ResumingCountDetector]
176 class ResumingCountDetector : public DetectorGraph::Detector
177 , public DetectorGraph::SubscriberInterface<DetectorGraph::ResumeFromSnapshotTopicState>
178 , public DetectorGraph::SubscriberInterface<EventHappened>
179 , public DetectorGraph::Publisher<EventCount>
180 {
181 public:
182  ResumingCountDetector(DetectorGraph::Graph* graph) : DetectorGraph::Detector(graph), mEventCount(0)
183  {
184  Subscribe<DetectorGraph::ResumeFromSnapshotTopicState>(this);
185  Subscribe<EventHappened>(this);
186  SetupPublishing<EventCount>(this);
187  }
188 
189  //![Evaluate-ResumeFromSnapshot]
190  void Evaluate(const DetectorGraph::ResumeFromSnapshotTopicState& aResumeFrom)
191  {
192  const auto previousEventCount = aResumeFrom.snapshot.GetState<EventCount>();
193  if (previousEventCount)
194  {
195  mEventCount = *previousEventCount;
196  }
197  }
198  //![Evaluate-ResumeFromSnapshot]
199 
200  void Evaluate(const EventHappened&)
201  {
202  mEventCount.count++;
203  }
204  void CompleteEvaluation()
205  {
206  Publish(mEventCount);
207  }
208 
209 private:
210  EventCount mEventCount;
211 };
212 //![ResumingCountDetector]
213 
214 //![ResumingGraph]
215 class ResumingGraph : public DetectorGraph::ProcessorContainer
216 {
217 public:
218  ResumingGraph(const DetectorGraph::StateSnapshot& aInitialSnapshot)
219  : mResumingCountDetector(&mGraph)
220  {
221  ProcessData(DetectorGraph::ResumeFromSnapshotTopicState(aInitialSnapshot));
222  }
223 
224  ResumingCountDetector mResumingCountDetector;
225 
226  virtual void ProcessOutput()
227  {
228  //![UpdateStateStore]
229  mStateStore.TakeNewSnapshot(mGraph.GetOutputList());
230  //![UpdateStateStore]
231  }
232 
233  template<class TTopicState> ptr::shared_ptr<const TTopicState> GetLastTopicState() const
234  {
235  DG_ASSERT(DetectorGraph::TopicState::GetId<TTopicState>()
237 
238  return ptr::static_pointer_cast<const TTopicState>(
239  mStateStore.GetLastState()->GetState<TTopicState>());
240  }
241  ptr::shared_ptr<const DetectorGraph::StateSnapshot> GetLastState() const
242  {
243  return mStateStore.GetLastState();
244  }
245 
246 private:
247  DetectorGraph::GraphStateStore mStateStore;
248 };
249 
250 //![ResumingGraph]
251 
252 constexpr char kSavedSnapshotFileName[] = "resumingGraphSnapshot.txt";
253 
254 //![PrimeSnapshot]
255 DetectorGraph::StateSnapshot GetPrimeSnapshot()
256 {
257  std::list< ptr::shared_ptr<const DetectorGraph::TopicState> > topicStatesList;
258  topicStatesList.push_back(ptr::shared_ptr<const DetectorGraph::TopicState>(new EventCount(1000)));
259  // ... add any other TopicStates that need Prime states.
260  return DetectorGraph::StateSnapshot(topicStatesList);
261 }
262 //![PrimeSnapshot]
263 
264 //![Serialization]
265 void WriteSnapshot(const DetectorGraph::StateSnapshot& aSnapshot)
266 {
267  std::ofstream snapshotOutStream(kSavedSnapshotFileName);
268 
269  // Generally here you'd have a loop through a list of TopicState
270  // serializers giving each one a chance to flush that topic's contents to
271  // disk. And you'd normally use something fancier for Serialization (e.g.
272  // protobufs).
273  // For the purposes of this demo we'll mock that in a single method:
274  {
275  ptr::shared_ptr<const EventCount> countPtr = aSnapshot.GetState<EventCount>();
276  if (countPtr)
277  {
278  snapshotOutStream << countPtr->count << endl;
279  }
280  // ... add serialization of any other TopicStates of interest.
281  }
282 }
283 //![Serialization]
284 
285 //![Deserialization]
286 DetectorGraph::StateSnapshot ReadSnapshot(const DetectorGraph::StateSnapshot& primeSnapshot)
287 {
288  std::list< ptr::shared_ptr<const DetectorGraph::TopicState> > topicStatesList;
289 
290  std::ifstream snapshotInStream(kSavedSnapshotFileName);
291  if (snapshotInStream.is_open())
292  {
293  // Generally here you'd have a loop through the TopicStates found on
294  // Storage and adding each one to `topicStatesList`
295  {
296  int count;
297  snapshotInStream >> count;
298  topicStatesList.push_back(ptr::shared_ptr<const DetectorGraph::TopicState>(new EventCount(count)));
299  }
300  // ... add deserialization of any other TopicStates of interest.
301  }
302 
303  return DetectorGraph::StateSnapshot(primeSnapshot, topicStatesList);
304 }
305 //![Deserialization]
306 
307 //![main]
308 int main()
309 {
310  DetectorGraph::StateSnapshot primeSnapshot = GetPrimeSnapshot();
311  DetectorGraph::StateSnapshot resumeSnapshot = ReadSnapshot(primeSnapshot);
312 
313  ResumingGraph resumingGraph = ResumingGraph(resumeSnapshot);
314  for (int i = 0; i < 7; ++i)
315  {
316  resumingGraph.ProcessData(EventHappened());
317  cout << "EventCount = " << resumingGraph.GetLastTopicState<EventCount>()->count << endl;
318 
319  const auto latestSnapshot = resumingGraph.GetLastState();
320  WriteSnapshot(*latestSnapshot);
321  }
322 }
323 //![main]
324 
325 /// @endcond DO_NOT_DOCUMENT
Implements a graph of Topics & Detectors with Input/Output APIs.
Definition: graph.hpp:127
A Base class for a basic Graph container.
ptr::shared_ptr< const TopicState > GetState(TopicStateIdType aId) const
Returns a TopicState for a given Id.
The collection of TopicStates that represents the graph state so far.
A StateSnapshot keeper for DetectorGraph TopicStates.
Base struct for topic data types.
Definition: topicstate.hpp:52
Base class that implements a Publisher behavior.
Definition: publisher.hpp:66
A unit of logic in a DetectorGraph.
Definition: detector.hpp:68
A Pure interface that declares the Subscriber behavior.
int TopicStateIdType
Definition: topicstate.hpp:27
#define DG_ASSERT(condition)
Definition: dgassert.hpp:20