Skip to content

Sessions

GraphSession and AsyncGraphSession execute Cypher queries against a Neo4j-compatible database and hydrate results into your Pydantic models. They are the execution layer that pairs with the Query builder and Repository.

Constructor takes a database instance

GraphSession(db, schema) accepts a Neo4jDatabase instance (or any duck-typed object with execute()), not a URI / user / password triple. The same applies to AsyncGraphSession and Repository. See API caveats.

GraphSession

from cypher_validator import GraphSession, Neo4jDatabase, GraphSchema

db = Neo4jDatabase("bolt://localhost:7687", "neo4j", "password")
schema = GraphSchema.from_models([Person, Movie, ActedIn])
session = GraphSession(db, schema)

Constructor

GraphSession(db: Any, schema: GraphSchema | None = None)
  • db — any object with execute(cypher, params) -> list[dict]. Neo4jDatabase is the canonical choice but a _FakeDB mock works fine in tests.
  • schema — optional. Required only for apply_ddl(). Other methods don't need it.

Raw execution

records = session.execute("MATCH (n:Person) RETURN n LIMIT 5", {})

session.execute(cypher, params) is a thin wrapper around db.execute(). Use this for ad-hoc queries you've already constructed and validated.

Query builder execution

q = Query().match(Person, "p").where(Cond("p.age", ">", 30)).return_("p")
records = session.execute_query(q)

execute_query(q) calls q.build() and forwards the tuple to db.execute().

Typed query

people = session.query(
    Person,
    var="p",
    where={"name": "Alice"},
    limit=10,
    order_by="p.age",
)
# Returns list[Person] — hydrated Pydantic instances.

query() is the typed shortcut: it builds a MATCH (var:Label) WHERE …, executes it, and hydrates the records via Person.from_records(records, var).

Create / merge / delete

session.create(Person(name="Bob", age=25))
session.merge(Person(name="Alice", age=31), merge_keys=["name"])
session.delete(Person, where={"name": "Bob"}, detach=True)

Bulk operations

session.bulk_create(Person, [
    {"name": "Carol", "age": 28},
    {"name": "Dave", "age": 33},
])

session.bulk_merge(
    Person,
    [{"name": "Eve", "age": 29}, {"name": "Frank", "age": 41}],
    merge_keys=["name"],
)

These delegate to BulkOps under the hood — single round-trip UNWIND queries.

Relationships

session.create_relationship(
    ActedIn(roles=["Trinity"]),
    src_match={"name": "Carrie-Anne Moss"},
    tgt_match={"title": "The Matrix"},
)

Traversal helpers

neighbors = session.neighbors(Person, match_props={"name": "Alice"})
path = session.shortest_path(
    Person, Person,
    src_props={"name": "Alice"},
    tgt_props={"name": "Bob"},
)

These call Traversal.neighbors / Traversal.shortest_path and execute the result.

results = session.vector_search(
    Document, "embedding", query_vector, top_k=5
)
# Returns list of {"node": Document(...), "score": 0.95}

vector_search(model, index_property, query_vector, top_k=10) derives the vector index name from the model's label and property, executes the search, and hydrates the results into Pydantic model instances.

semantic_search() adds an embedding step — pass text instead of a vector:

from cypher_validator.embeddings import OpenAIEmbeddings
embed = OpenAIEmbeddings()

results = session.semantic_search(
    Document, "embedding", "what is graph RAG?", embed, top_k=5
)

Both methods are available on GraphSession and AsyncGraphSession (use await for async).

See Vector search for the full workflow including index creation and embedding adapters.

apply_ddl(include_existence=False) → list[str]

session.apply_ddl()
# Runs every CREATE CONSTRAINT / CREATE INDEX from SchemaDDL.generate_all().
# Returns the list of statements that were executed.

Requires the session to have been constructed with a schema. Use this once on deployment / migration. include_existence=True enables the Enterprise-only IS NOT NULL constraints.

Context manager

with GraphSession(db, schema) as session:
    session.execute(...)
# (no automatic db.close() — the session does not own the database connection)

GraphSession defines __enter__ / __exit__ for symmetry with the async version, but it intentionally does not close the underlying database — db is shared and managed by the caller.

AsyncGraphSession

The async sibling. Use it inside asyncio apps or LLM agent frameworks.

from cypher_validator import AsyncGraphSession

async with AsyncGraphSession(db, schema) as session:
    people = await session.query(Person, where={"name": "Alice"})
    await session.create(Person(name="Bob", age=25))
    await session.bulk_create(Person, [...])

Constructor

AsyncGraphSession(db: Any, schema: GraphSchema | None = None)

Same parameters as GraphSession. The session keeps a QueryHistory for LLM context.

How execution dispatches

async def execute(self, cypher, params): ...

The implementation checks whether db.execute is a coroutine function:

  • If yes → await db.execute(cypher, params) directly.
  • If no → loop.run_in_executor(None, db.execute, cypher, params) — runs the sync call in a thread pool, so the event loop is not blocked.

This means you can use a synchronous Neo4jDatabase with AsyncGraphSession without porting the driver layer.

Methods

The async API mirrors GraphSession:

Async method Equivalent sync
await session.execute(cypher, params) session.execute()
await session.execute_query(q) session.execute_query()
await session.query(Person, where=..., limit=...) session.query()
await session.create(instance) session.create()
await session.merge(instance, merge_keys=…) session.merge()
await session.bulk_create(model, items) session.bulk_create()
await session.bulk_merge(model, items, merge_keys=…) session.bulk_merge()
await session.neighbors(model, match_props=…) session.neighbors()

__aenter__ / __aexit__

async with AsyncGraphSession(db, schema) as session:
    ...
# On __aexit__, db.close() is called if it exists.

Note: unlike GraphSession, the async exit does call db.close() if available, on the assumption that you're holding the session over a long-lived async scope.

History

AsyncGraphSession keeps a QueryHistory of every executed query — including failures. This is invaluable for LLM agents that need to know what they've already tried:

print(session.history.to_context())
# ## Previous Queries
# 1. `MATCH (p:Person {name: $name}) RETURN p` → 1 results
# 2. `MATCH (p:Person)-[:KNOWS]->(f) RETURN f` → error: SyntaxError ...

QueryHistory API (defaults: max_entries=50):

Method Notes
history.add(cypher, params, result_count, error, summary) Append a record.
history.add_from_result(result: QueryResult) Pull from a QueryResult.
history.entries List of QueryHistoryEntry.
history.last Most recent entry, or None.
history.clear() Reset.
history.to_context(last_n=None) LLM-formatted summary.
history.to_list() List of dicts (for serialisation).
history.successful_queries() Filter by error is None.
history.failed_queries() Filter by error is not None.
history.find_similar(cypher) Substring-based lookup.

Picking sync vs async

Situation Pick
Plain script or CLI tool GraphSession
Inside asyncio.run(...) / FastAPI / LangChain async agent AsyncGraphSession
Large LLM batch ingestion with concurrent rate-limited calls AsyncGraphSession + the async LLMNLToCypher

The two share the same on-the-wire behaviour — switching is just a matter of await-ing the methods.