Commit aed793a1 authored by Tim Bleimehl's avatar Tim Bleimehl 🤸🏼
Browse files

wip

parent 37cf2d00
# Possible vis libs
https://networkx.org/documentation/latest/tutorial.html#drawing-graphs
https://github.com/jexp/neo4j-graphviz
\ No newline at end of file
# neo4j-meta-logger # neo4j-meta-logger
A python module to log your neo4j graph transformation. A python module to log and track your neo4j graph schema transformations.
\ No newline at end of file
`call db.schema.visualization` + a time machine + some helpful tools and statistics
Works fast with neo4j graphs of any size 💪
Maintainer: Tim Bleimehl
Status: Pre-alpha
# Example
Lets create a sample graph with python and Neo4J
```python
import py2neo
from neo_meta_logger import NeoMetaLogger
g = py2neo.Graph(name="test_graph")
g.run(
"CREATE p = (:Human{name:'Amina Okujewa'})-[:LIVES_ON]->(:World {name: 'Earth'})"
)
g.run(
"CREATE p = (:Cat{name:'Grumpy Cat'})-[:LIVES_ON]->(:World {name: 'Internet'})"
)
g.run(
"MATCH (wI:World{name:'Internet'}),(wE:World{name:'Earth'}) CREATE (wI)-[:EXISTS_ON]->(wE)"
)
```
Our graph looks like this:
!["docs/01_test_base_graph.png"](docs/01_test_base_graph.png)
and the schema will look like this:
!["docs/03_schema.png"](docs/03_schema.png)
Lets capture the current status to analyse the changes later.
```python
meta_logger = NeoMetaLogger(test_graph)
meta_logger.capture()
```
Now lets do some changes to our Graphs content
```python
g.run(
"MATCH (wI:World{name:'Internet'}),(wE:World{name:'Earth'}) CREATE (as:Human{name:'Aaron Swartz'})-[:LIVES_ON]->(wI), (as)-[:LIVES_ON]->(wE)"
)
```
Our graph now looks like this
!["docs/docs/02_graph_extended.png"](docs/02_graph_extended.png)
The schema still looks the same. We wont be able to recognize changes without any help:
`call db.schema.visualization`
!["docs/03_schema.png"](docs/03_schema.png)
Here come `NeoMetaLogger` for the rescue
Lets do another capture to compare the changes we did:
```python
meta_logger = NeoMetaLogger(test_graph)
meta_logger.capture()
```
Now we can analize the changes in the graph:
```python
meta_logger.get_numeric_last_changes()
```
This outputs:
`{'labels': {'Human': 1}, 'relations': {'LIVES_ON': 2}}`
We can see we created one new Node with the Label `Human` and 2 new relations named `LIVES_ON`.
This allready can be a valuable meta information, but wait theres more...
Lets visualize the schema changes in another graph.
```python
changes_subgraph = meta_logger.get_schemagraph_last_changes()
schema_g = py2neo.Graph(name="test_graph")
schema_g.merge(changes_subgraph)
```
Now we can visualize the changes in our schema:
!["docs/04_schema_changes.png"](docs/04_schema_changes.png)
We see only the part of your schmea that changes.
We can also recall any old state of our graph with
```python
meta_logger.capture_points[0].schema
```
This will return a `py2neo.Subgraph` of the schema from the beginning of our script. Same as `call db.schema.visualization` but with a timemachine :)
\ No newline at end of file
...@@ -7,15 +7,23 @@ class CapturePoint: ...@@ -7,15 +7,23 @@ class CapturePoint:
"""The state of a Neo4j database in terms of relationship and node count at a certain time""" """The state of a Neo4j database in terms of relationship and node count at a certain time"""
def __init__( def __init__(
self, labels_count: Dict[str, int], relations_count: Dict[str, int], schema self,
parent_logger: "NeoMetaLogger",
name: str,
labels_count: Dict[str, int],
relations_count: Dict[str, int],
neo4j_schema_vis_data: Dict,
): ):
self.parent_logger = parent_logger
self.name = name
self.timestamp: float = time.time() self.timestamp: float = time.time()
self.labels = labels_count self.labels = labels_count
self.relations = relations_count self.relations = relations_count
self.schema = schema self.schema: GraphSchema = GraphSchema.from_neo4j_schema_vis_data(
tn = schema["nodes"][0] neo4j_schema_vis_data,
print(tn, tn.relationships) parent_capture_point=self,
exit() extra_props={"name": name} if name else {},
)
def __hash__(self): def __hash__(self):
return hash(self.timestamp) return hash(self.timestamp)
...@@ -24,9 +32,108 @@ class CapturePoint: ...@@ -24,9 +32,108 @@ class CapturePoint:
return type(other) is type(self) and self.timestamp == other.timestamp return type(other) is type(self) and self.timestamp == other.timestamp
class GraphSchema(py2neo.Subgraph):
@classmethod
def from_neo4j_schema_vis_data(
cls,
neo4j_schema_vis_data: Dict,
parent_capture_point: "CapturePoint",
extra_props: Dict = None,
) -> "GraphSchema":
# ToDo: Break this function down. Too complex/spaghetti!
# the nodes are bound atm. meaning they belong to a certain DB. we need to unbound them.
# we clean the nodes and relationship from Neo4j's `call db.schema.visualization` from IDs (the IDs are random anyway on every call and consequently worthless outside of the own transaction)
# We connect the nodes and relationships into a py2neo.Subgraph
# store that in an instance of GraphSchema
schema_graph: GraphSchema = cls()
if not parent_capture_point.parent_logger.all_schema_nodes:
parent_capture_point.parent_logger.all_schema_nodes = {}
if not extra_props:
extra_props = {}
extra_props = extra_props | {"__neo_meta_logger_node": True}
for rel in neo4j_schema_vis_data["relationships"]:
rel_nodes = [None, None]
for node in neo4j_schema_vis_data["nodes"]:
if rel_nodes[0] and rel_nodes[1]:
break
if node.identity in [rel.start_node.identity, rel.end_node.identity]:
clean_rel_node = cls._get_or_create_unbound_schema_node(
node=node,
parent_capture_point=parent_capture_point,
extra_props=extra_props,
)
if rel.start_node.identity == rel.end_node.identity:
rel_nodes[0] = rel_nodes[1] = clean_rel_node
elif node.identity == rel.start_node.identity:
rel_nodes[0] = clean_rel_node
elif node.identity == rel.end_node.identity:
rel_nodes[1] = clean_rel_node
clean_rel = py2neo.Relationship(
rel_nodes[0],
type(rel).__name__,
rel_nodes[1],
**extra_props,
)
schema_graph = schema_graph | clean_rel
# atm we ignored any nodes without any relation.
single_nodes_cleaned = []
for node in neo4j_schema_vis_data["nodes"]:
single_nodes_cleaned.append(
cls._get_or_create_unbound_schema_node(
node=node,
parent_capture_point=parent_capture_point,
extra_props=extra_props,
)
)
return schema_graph | cls(single_nodes_cleaned)
@classmethod
def _get_or_create_unbound_schema_node(
cls,
node: py2neo.Node,
parent_capture_point: "CapturePoint",
extra_props: Dict = None,
) -> py2neo.Node:
if tuple(
node.labels
) not in parent_capture_point.parent_logger.all_schema_nodes or (
parent_capture_point.parent_logger.all_schema_nodes[tuple(node.labels)][
"__node_count"
]
!= parent_capture_point.labels[tuple(node.labels)[0]]
):
parent_capture_point.parent_logger.all_schema_nodes[
tuple(node.labels)
] = clean_rel_node = py2neo.Node(
*list(node.labels),
**(
extra_props
| {
"__label_name": ":".join(list(node.labels)),
"__node_count": parent_capture_point.labels[
tuple(node.labels)[0]
],
}
),
)
clean_rel_node.__primarykey__ = "__label_name"
clean_rel_node.__primarylabel__ = list(node.labels)[0]
else:
clean_rel_node = parent_capture_point.parent_logger.all_schema_nodes[
tuple(node.labels)
]
return clean_rel_node
class NeoMetaLogger: class NeoMetaLogger:
def __init__(self, connection: Union[py2neo.Graph, Dict]): def __init__(self, connection: Union[py2neo.Graph, Dict]):
self.capture_points: List[CapturePoint] = [] self.capture_points: List[CapturePoint] = []
self.all_schema_nodes: Dict[tuple, py2neo.Node] = None
if isinstance(connection, dict): if isinstance(connection, dict):
self.graph: py2neo.Graph = py2neo.Graph(**connection) self.graph: py2neo.Graph = py2neo.Graph(**connection)
elif isinstance(connection, py2neo.Graph): elif isinstance(connection, py2neo.Graph):
...@@ -36,12 +143,18 @@ class NeoMetaLogger: ...@@ -36,12 +143,18 @@ class NeoMetaLogger:
f"Expected 'py2neo.Graph' or 'dict'. Got '{type(connection)}'" f"Expected 'py2neo.Graph' or 'dict'. Got '{type(connection)}'"
) )
def capture(self): def capture(self, name: str = None):
labels_count = self._count_labels() labels_count = self._count_labels()
relations_count = self._count_relations() relations_count = self._count_relations()
self.capture_points.append( self.capture_points.append(
CapturePoint(labels_count, relations_count, self._query_schema()) CapturePoint(
name=name,
parent_logger=self,
labels_count=labels_count,
relations_count=relations_count,
neo4j_schema_vis_data=self._query_schema(),
)
) )
def _query_schema(self): def _query_schema(self):
...@@ -53,10 +166,9 @@ class NeoMetaLogger: ...@@ -53,10 +166,9 @@ class NeoMetaLogger:
def _count_labels(self) -> Dict[str, int]: def _count_labels(self) -> Dict[str, int]:
all_labels: List[str] = self.graph.run( all_labels: List[str] = self.graph.run(
"CALL db.labels() yield label return collect(label) as res" "CALL db.labels() yield label return collect(label) as res"
).data()[0]["res"][0] ).data()[0]["res"]
labels_count: Dict[str, int] = {} labels_count: Dict[str, int] = {}
for label in all_labels: for label in all_labels:
query_label_count = f""" query_label_count = f"""
MATCH (n:{label}) MATCH (n:{label})
RETURN count(n) AS res RETURN count(n) AS res
...@@ -78,21 +190,62 @@ class NeoMetaLogger: ...@@ -78,21 +190,62 @@ class NeoMetaLogger:
rels_count[rel] = self.graph.run(query_rel_count).data()[0]["res"] rels_count[rel] = self.graph.run(query_rel_count).data()[0]["res"]
return rels_count return rels_count
pass def get_capture_point_by(
self,
capture_point_index: int = None,
capture_point_time: float = None,
capture_point_name: str = None,
):
if capture_point_time:
return next(
cp for cp in self.capture_points if capture_point_time == cp.timestamp
)
elif capture_point_index:
return self.capture_points[capture_point_index]
elif capture_point_name:
return next(
cp for cp in self.capture_points if capture_point_name == cp.name
)
def get_schemagraph_last_changes(self) -> py2neo.Subgraph:
return self.get_schemagraph_changes_since(
self.get_capture_point_by(capture_point_index=-2)
)
def get_last_changes(self) -> Dict[str, Dict[str, int]]: def get_schemagraph_changes_since(
self,
capture_point: CapturePoint = None,
) -> Dict[str, Dict[str, int]]:
"""Get changes, in terms of quantity, of labels and relations since the last capture compared to the current capture """Get changes, in terms of quantity, of labels and relations since the last capture compared to the current capture
Returns: Returns:
Dict[str,Dict[str,int]]: A base dictonary "{'labels':{...},'relation':{...}}" containing two dictoniries listing changes (in terms of quantity) for labels and relations. Dict[str,Dict[str,int]]: A base dictonary "{'labels':{...},'relation':{...}}" containing two dictoniries listing changes (in terms of quantity) for labels and relations.
""" """
return self.get_changes_since(capture_point_index=-2) current_capture_point: CapturePoint = (
self.capture_points[-1] if self.capture_points else None
)
return self.get_schemagraph_changes(
from_capture=capture_point, to_capture=current_capture_point
)
def get_schemagraph_changes(
self, from_capture: CapturePoint, to_capture: CapturePoint
) -> py2neo.Subgraph:
return from_capture.schema - to_capture.schema
def get_changes_since( def get_numeric_last_changes(self) -> Dict[str, Dict[str, int]]:
"""Get changes, in terms of quantity, of labels and relations since the last capture compared to the current capture
Returns:
Dict[str,Dict[str,int]]: A base dictonary "{'labels':{...},'relation':{...}}" containing two dictoniries listing changes (in terms of quantity) for labels and relations.
"""
return self.get_numeric_changes_since(
self.get_capture_point_by(capture_point_index=-2)
)
def get_numeric_changes_since(
self, self,
capture_point: CapturePoint = None, capture_point: CapturePoint = None,
capture_point_index: int = None,
capture_point_time: float = None,
) -> Dict[str, Dict[str, int]]: ) -> Dict[str, Dict[str, int]]:
"""Get changes, in terms of quantity, of labels and relations since the last capture compared to the current capture """Get changes, in terms of quantity, of labels and relations since the last capture compared to the current capture
...@@ -102,18 +255,11 @@ class NeoMetaLogger: ...@@ -102,18 +255,11 @@ class NeoMetaLogger:
current_capture_point: CapturePoint = ( current_capture_point: CapturePoint = (
self.capture_points[-1] if self.capture_points else None self.capture_points[-1] if self.capture_points else None
) )
compare_capture_point: CapturePoint = None return self.get_numeric_changes(
if capture_point_time: from_capture=capture_point, to_capture=current_capture_point
raise NotImplementedError()
elif capture_point_index:
compare_capture_point = self.capture_points[capture_point_index]
elif capture_point:
compare_capture_point = capture_point
return self.get_changes(
from_capture=compare_capture_point, to_capture=current_capture_point
) )
def get_changes(self, from_capture: CapturePoint, to_capture: CapturePoint): def get_numeric_changes(self, from_capture: CapturePoint, to_capture: CapturePoint):
if len(self.capture_points) <= 1: if len(self.capture_points) <= 1:
raise ValueError( raise ValueError(
f"You need at least 2 capture points to compare any changes. Got only {len(self.capture_points)} points" f"You need at least 2 capture points to compare any changes. Got only {len(self.capture_points)} points"
......
import py2neo
g = py2neo.Graph()
g.run("match (n) where id(n)=-1 with n match (n)-[r]-(m) return id(r)")
import sys import sys
import os import os
from typing import Dict,List from typing import Dict, List
import json import json
import py2neo import py2neo
from DZDutils.neo4j import wait_for_db_boot from DZDutils.neo4j import wait_for_db_boot
if __name__ == "__main__": if __name__ == "__main__":
SCRIPT_DIR = os.path.dirname( SCRIPT_DIR = os.path.dirname(
os.path.realpath(os.path.join(os.getcwd(), os.path.expanduser(__file__))) os.path.realpath(os.path.join(os.getcwd(), os.path.expanduser(__file__)))
...@@ -13,12 +14,32 @@ if __name__ == "__main__": ...@@ -13,12 +14,32 @@ if __name__ == "__main__":
sys.path.insert(0, os.path.normpath(SCRIPT_DIR)) sys.path.insert(0, os.path.normpath(SCRIPT_DIR))
from neo_meta_logger import NeoMetaLogger from neo_meta_logger import NeoMetaLogger
NEO4J: Dict = json.loads(os.getenv("NEO4J","{}"))
NEO4J: Dict = json.loads(os.getenv("NEO4J", "{}"))
wait_for_db_boot(NEO4J) wait_for_db_boot(NEO4J)
graph = py2neo.Graph(**NEO4J) graph = py2neo.Graph(**NEO4J)
mlog = NeoMetaLogger({})
graph.run("CREATE p = (:NEO_META_LOG_TESTNODE{name:'1'})-[:NEO_META_LOG_TESTREL]->(:NEO_META_LOG_TARGET_TESTNODE {name: '2'})") graph.run("CREATE OR REPLACE DATABASE schematest")
graph.run("CREATE OR REPLACE DATABASE test")
test_graph = py2neo.Graph(**(NEO4J | {"name": "test"}))
mlog = NeoMetaLogger(test_graph)
test_graph.run(
"CREATE p = (:Human{name:'Amina Okujewa'})-[:LIVES_ON]->(:World {name: 'Earth'})"
)
test_graph.run(
"CREATE p = (:Cat{name:'Grumpy Cat'})-[:LIVES_ON]->(:World {name: 'Internet'})"
)
test_graph.run(
"MATCH (wI:World{name:'Internet'}),(wE:World{name:'Earth'}) CREATE (wI)-[:EXISTS_ON]->(wE)"
)
mlog.capture() mlog.capture()
graph.run("CREATE p = (:NEO_META_LOG_TESTNODE{name:'3'})-[:NEO_META_LOG_TESTREL]->(:NEO_META_LOG_TARGET_TESTNODE {name: '4'})") test_graph.run(
"MATCH (wI:World{name:'Internet'}),(wE:World{name:'Earth'}) CREATE (as:Human{name:'Aaron Swartz'})-[:LIVES_ON]->(wI), (as)-[:LIVES_ON]->(wE)"
)
mlog.capture() mlog.capture()
print(mlog.get_last_changes())
\ No newline at end of file print(mlog.get_numeric_last_changes())
schema_graph = py2neo.Graph(**(NEO4J | {"name": "schematest"}))
schema_graph.merge(mlog.get_schemagraph_last_changes())
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment