1"""
2Time Series Graphs
3"""
4
5import itertools
6
7import networkx as nx
8
9__all__ = ["visibility_graph"]
10
11
12@nx._dispatchable(graphs=None, returns_graph=True)
13def visibility_graph(series):
14 """
15 Return a Visibility Graph of an input Time Series.
16
17 A visibility graph converts a time series into a graph. The constructed graph
18 uses integer nodes to indicate which event in the series the node represents.
19 Edges are formed as follows: consider a bar plot of the series and view that
20 as a side view of a landscape with a node at the top of each bar. An edge
21 means that the nodes can be connected by a straight "line-of-sight" without
22 being obscured by any bars between the nodes.
23
24 The resulting graph inherits several properties of the series in its structure.
25 Thereby, periodic series convert into regular graphs, random series convert
26 into random graphs, and fractal series convert into scale-free networks [1]_.
27
28 Parameters
29 ----------
30 series : Sequence[Number]
31 A Time Series sequence (iterable and sliceable) of numeric values
32 representing times.
33
34 Returns
35 -------
36 NetworkX Graph
37 The Visibility Graph of the input series
38
39 Examples
40 --------
41 >>> series_list = [range(10), [2, 1, 3, 2, 1, 3, 2, 1, 3, 2, 1, 3]]
42 >>> for s in series_list:
43 ... g = nx.visibility_graph(s)
44 ... print(g)
45 Graph with 10 nodes and 9 edges
46 Graph with 12 nodes and 18 edges
47
48 References
49 ----------
50 .. [1] Lacasa, Lucas, Bartolo Luque, Fernando Ballesteros, Jordi Luque, and Juan Carlos Nuno.
51 "From time series to complex networks: The visibility graph." Proceedings of the
52 National Academy of Sciences 105, no. 13 (2008): 4972-4975.
53 https://www.pnas.org/doi/10.1073/pnas.0709247105
54 """
55
56 # Sequential values are always connected
57 G = nx.path_graph(len(series))
58 nx.set_node_attributes(G, dict(enumerate(series)), "value")
59
60 # Check all combinations of nodes n series
61 for (n1, t1), (n2, t2) in itertools.combinations(enumerate(series), 2):
62 # check if any value between obstructs line of sight
63 slope = (t2 - t1) / (n2 - n1)
64 offset = t2 - slope * n2
65
66 obstructed = any(
67 t >= slope * n + offset
68 for n, t in enumerate(series[n1 + 1 : n2], start=n1 + 1)
69 )
70
71 if not obstructed:
72 G.add_edge(n1, n2)
73
74 return G