In the last challenge, you used session.run()
to execute a Cypher statement in an auto-complete transaction.
While this method may be convenient for one-off statements, you shouldn’t use session.run()
in production applications.
When using Neo4j in a clustered environment, this method will not attempt to retry failed Cypher statements caused by transient problems such as changes to the cluster topology or temporary connection problems.
Neo4j Clusters
When running Neo4j in production or using Neo4j Aura, you will most likely be querying a cluster of nodes rather than a single instance.
In a clustered Neo4j environment, each server will run in either Primary or Secondary mode.
In Neo4j’s clustering architecture, Primary servers are responsible for handling read and write operations, ensuring high availability and data durability.
Secondary servers are added to the cluster to scale out the read workloads by asynchronously replicating data from the primaries
In a clustered environment, the Driver can connect to any member within the cluster. Once connected, the Driver will receive metadata about the cluster, known as a Routing Table.
Reading Data from Neo4j
The Driver uses data from a Routing Table to distribute read statements across the cluster in a least-connected fashion, ensuring that read workloads are spread evenly across the cluster.
To execute a Cypher statement within a read transaction, call the executeRead()
method.
// Execute cypher in a read transaction
const res = await session.executeRead(
(tx: ManagedTransaction) => tx.run<T>(cypher, params)
)
The executeRead()
method executes a unit of work, represented as a callback function (1), before marking the transaction as successful.
The callback function is passed an instance of a ManagedTransaction
, on which the run()
method can be called multiple times to execute Cypher statements (2).
The run()
method is similar to session.run()
in that it accepts two parameters; a Cypher statement and an object representing the query parameters.
Writing Data to Neo4j
The cluster will always contain one instance, known as the Leader, which has the responsibility of processing write transactions. The Leader is responsible for distributing the transaction’s outcome across the cluster. Once the majority of core servers acknowledge this information, the transaction is considered committed and acknowledgement is sent back to the Driver along with any data requested by the Cypher statement.
To execute a Cypher statement within a write transaction, call the executeWrite()
method.
// Execute cypher in a write transaction
const res = await session.executeWrite(
(tx: ManagedTransaction) => tx.run<T>(cypher, params)
)
Generics
We have not yet covered the<T>
generic in the code samples above.
We will cover this later on in this course.Result Processing
Two methods are available for consuming results from a transaction, the Promise API and the Streaming API.
Promise API
The most common method of consuming results is with the Promise API.
Both session.run()
and tx.run()
return an instance of a Result
, a Promise-like class upon which you can chain .then()
, .catch()
and .finally()
methods.
Once the statement has been executed, and all results returned, the Result will resolve to a QueryResult
object.
// Execute cypher in a read transaction
const res = await session.executeRead(
(tx: ManagedTransaction) => tx.run<T>(cypher, params)
)
// res.records is type `Record[]`
res.records.map((record: Record) => {
// e.g. record.get('key')
})
You can also iterate over the result returned by tx.run()
or session.run()
within a for
loop.
const res = tx.run<T>(cypher, params)
for await (const record of res) {
console.log(record.get('Name'));
}
If you have long-running queries that explore a large part of the graph, it may take a while for the results to become available. In this case, you should use the Streaming API.
Streaming API
There may be occasions where you have slower-running or more complex queries but wish to process results as they become available.
For example, many Flight search websites have complex queries that take up to a minute to complete, but shorter, less complex graph patterns may be available immediately.
In this case, you can use the subscribe()
method to consume results as they become available.
This method is available when calling session.run()
or tx.run()
within a transaction function.
You can use this method with WebSockets or similar technologies to update the front-end application with new results as they are made available.
The subscribe()
method accepts one argument, an object of callbacks:
-
onKeys
- called when keys are made available for all records. -
onNext
- called when the next Record is available. -
onError
- called if an error occurs. -
onCompleted
- called when all records have been consumed, and no error has occurred.
// Execute cypher in a write transaction
session.executeWrite(
(tx: ManagedTransaction) =>
tx.run<T>(cypher, params)
.subscribe({
onKeys: (keys: string[]) => {
console.log(keys) // ['p', 'r', 'm']
},
onNext: (record: Record) => {
console.log(record.get('p')) // A Node with label `Person`
},
onCompleted: (summary: ResultSummary) => {
// A `summary` of the query, including execution time and update counters
// Close the Session
session.close()
},
onError: (error: Error) => {
console.log(error)
}
})
)
Individual Results
In both examples above, individual records are accessed through an implementation of the Record
type exported from neo4j-driver
.
The Record
type contains methods for interacting with each Record.
The .keys()
method provides an array of available keys on the Record, and the .has()
method can be called to check that a key exists on the Record.
console.log(record.keys()) // ['p', 'r', 'm']
console.log(record.has('x')) // false
There are several ways to iterate over a record; .forEach()
and .map()
work as if the record were an array
, and you can also call .entries()
or .values()
to act on the Record as if it were a Map
object.
You can also access values within a for
loop.
for (const [ key, value ] of record.entries()) {
console.log(key); //e.g. ['p']
console.log(value); //e.g. a `Node` instance
}
To retrieve an individual value from a Record, call the .get()
method.
The method accepts a single parameter, the alias of the value stated in the RETURN
statement or the 0-based index of the value.
console.log(record.get('p')) // a (:Person) node
console.log(record.get(0)) // `p` is the first value in the RETURN statement
You can quickly convert the Record to a JavaScript object using the .toObject()
method, a convenient way of converting nested values without recursively iterating through the Record.
console.log(Record.toObject()) // {p: Node, r: Relationship, m: Node}
Check Your Understanding
Reading from Neo4j
Which method would you use to read data from Neo4j?
-
❏
session.read()
-
❏
session.readQuery()
-
✓
session.executeRead()
-
❏
session.executeWrite()
Hint
You are looking to execute a read query against the database.
Solution
The answer is session.executeRead()
Writing to Neo4j
Which method would you use to write data to Neo4j?
-
❏
session.insert()
-
❏
session.write()
-
❏
session.writeQuery()
-
✓
session.executeWrite()
Hint
You are looking to execute a write query against the database.
Solution
The answer is session.executeWrite()
Using the Promise API
What is the drawback of using the Promise API to consume results?
-
❏ The Promise API is only available to Enterprise customers.
-
✓ Results are only available once the Driver has received the final record.
-
❏ You can only use the Promise API within a Read Transactions.
Hint
If you are not subscribing to the record stream, you will only be able to access the first record once the entire stream has finished.
Solution
Results are only available once the Driver has received the final record. This can provide a negative experience for users waiting for the results of a long-running query.
Getting individual values
Which method would you call to get an individual value from a Record?
-
❏
record['p'].value
-
✓
record.get('p')
-
❏
record.key('p')
-
❏
record.p()
-
❏
record.value('p')
Hint
Each record has a method for getting a value.
Solution
To get the p
value from a record, call the record.get('p')
method.
Lesson Summary
In this lesson, you learned the best practices for reading data from and writing data to Neo4j.
You also learned how to access the records returned by session.run()
and tx.run()
.
In the next challenge, you will use this knowledge to execute a Cypher statement in a read transaction.