Distributed Iterative Graph Processing (Pregel)
Pregel enables you to do online analytical processing directly on graphs stored in ArangoDB
Distributed graph processing enables you to do online analytical processing directly on graphs stored in ArangoDB. This is intended to help you gain analytical insights on your data, without having to use external processing systems. Examples of algorithms to execute are PageRank, Vertex Centrality, Vertex Closeness, Connected Components, Community Detection. For more details, see all available algorithms in ArangoDB.
Check out the hands-on ArangoDB Pregel Tutorial to learn more.
The processing system inside ArangoDB is based on: Pregel: A System for Large-Scale Graph Processing – Malewicz et al. (Google), 2010. This concept enables us to perform distributed graph processing, without the need for distributed global locking.
This system is not useful for typical online queries, where you just work on a small set of vertices. These kind of tasks are better suited for AQL traversals.
Prerequisites
If you run a single ArangoDB instance in single-server mode, there are no requirements regarding the modeling of your data. All you need is at least one vertex collection and one edge collection.
In cluster deployments, the collections need to be sharded in a specific way to ensure correct results: The outgoing edges of a vertex need to be on the same DB-Server as the vertex. This is guaranteed by SmartGraphs. Thus, Pregel in cluster deployments is not usable in the Community Edition.
Note that the performance may be better, if the number of your shards / collections matches the number of CPU cores.
JavaScript API
Starting an Algorithm Execution
The Pregel API is accessible through the @arangodb/pregel
package.
To start an execution, you need to specify the algorithm name and a
named graph (SmartGraph in cluster). Alternatively, you can specify the vertex
and edge collections. Additionally, you can specify custom parameters which vary
for each algorithm. The start()
method always returns a unique ID
(a numeric string) which you can use to interact with the algorithm later on.
The following example shows the start()
method variant for using a named graph:
var pregel = require("@arangodb/pregel");
var params = {};
var execution = pregel.start("<algorithm>", "<graphname>", params);
You can also specify the vertex and edge collections directly. In this case,
the second argument must be an object with the keys vertexCollections
and edgeCollections
:
var execution = pregel.start("<algorithm>", { vertexCollections: ["vertices"], edgeCollections: ["edges"] }, params);
The params
argument needs to be an object with the algorithm settings as
described in Pregel Algorithms.
Status of an Algorithm Execution
You can call pregel.status()
and use the ID returned by the pregel.start(...)
method to track the status of your algorithm:
var execution = pregel.start("sssp", "demograph", { source: "vertices/V" });
var status = pregel.status(execution);
It tells you the current state
of the execution, the current
global superstep, the runtime, the global aggregator values as well as the
number of send and received messages.
The state
field has one of the following values:
State | Description |
---|---|
"none" | The Pregel run has not started yet. |
"loading" | The graph data is being loaded from the database into memory before executing the algorithm. |
"running" | The algorithm is executing normally. |
"storing" | The algorithm finished, but the results are still being written back into the collections. Only occurs if the store parameter is set to true . |
"done" | The execution is done. This means that storing is also done. This event is announced in the server log (requires at least the info log level for the pregel log topic). |
"canceled" | The execution was permanently canceled, either by the user or by an error. |
"in error" | The execution is in an error state. This can be caused by primary DB-Servers being unreachable or unresponsive. The execution might recover later, or switch to canceled if it is not able to recover successfully. |
"recovering" | The execution is actively recovering and switches back to running if the recovery is successful. |
"fatal error" | The execution resulted in an non-recoverable error. |
The object returned by the status()
method looks like this:
{
"state" : "running",
"gss" : 12,
"totalRuntime" : 123.23,
"aggregators" : {
"converged" : false,
"max" : true,
"phase" : 2
},
"sendCount" : 3240364978,
"receivedCount" : 3240364975
}
Canceling an Execution / Discarding results
To cancel an execution which is still running and discard any intermediate
results, you can use the pregel.cancel()
method. This immediately frees all
memory taken up by the execution, and makes you lose all intermediary data.
// start a single source shortest path job
var execution = pregel.start("sssp", "demograph", { source: "vertices/V" });
pregel.cancel(execution);
You might get inconsistent results if you requested to store the results and
then cancel an execution when it is already in its storing
state.
The data is written multi-threaded into all
collection shards at once. This means there are multiple transactions
simultaneously. A transaction might already be committed when you cancel the
execution job. Therefore, you might see some updated documents, while other
documents have no or stale results from a previous execution.
Get persisted execution statistics
You can call pregel.history()
and use the ID returned by the pregel.start(...)
method to get the execution statistics of your algorithm, for an active as well
as a finished Pregel job:
const execution = pregel.start("sssp", "demograph", { source: "vertices/V" });
const historyStatus = pregel.history(execution);
It tells you the current state
of the execution, the current
global superstep, the runtime, the global aggregator values, as well as the
number of send and received messages. Additionally, you see which user started
the Pregel job. It also contains details about specific execution timings.
The execution statistics are persisted to a system collection and kept until you
remove them, whereas the information pregel.status()
returns is only kept
temporarily in memory.
The object returned by the pregel.history()
method looks like this:
{
"algorithm" : "pagerank",
"created" : "2023-04-18T11:12:50Z",
"database" : "_system",
"graphLoaded" : true,
"gss" : 15,
"id" : "109645",
"state" : "done",
"ttl" : 600,
"user" : "MarcelJansen",
"detail" : {
"aggregatedStatus" : {
"timeStamp" : "2023-04-18T11:12:50Z",
"graphStoreStatus" : {
},
"allGssStatus" : {
"items" : [ ... ]
}
},
"workerStatus" : { ... }
},
"sendCount" : 540,
"aggregators" : {
"convergence" : 0
},
"gssTimes" : [
0.000315457,
0.000484604,
...
],
"receivedCount" : 504,
"expires" : "2023-04-18T11:22:50Z",
"vertexCount" : 36,
"storageTime" : 0.002114291,
"totalRuntime" : 0.046748679,
"parallelism" : 8,
"masterContext" : {
},
"edgeCount" : 36,
"startupTime" : 0.032753187,
"computationTime" : 0.011542727
}
In case you want to read the persisted execution statistics of all currently
active and past Pregel jobs, call the pregel.history()
method without a
parameter.
const historyStates = pregel.history();
The call without arguments returns a list of execution statistics for algorithm executions:
[
{
"algorithm": "pagerank",
"...": "..."
},
{
"algorithm": "sssp",
"...": "..."
},
{
"algorithm": "connectedcomponents",
"...": "..."
}
]
Remove persisted execution statistics
You can call pregel.removeHistory()
and use the ID returned by the
pregel.start(...)
method to remove the persisted execution statistics of a
specific Pregel job when you don’t need it anymore.
const execution = pregel.start("sssp", "demograph", { source: "vertices/V" });
pregel.removeHistory(execution);
In case you want to remove the persisted execution statistics of all Pregel jobs
at once, call pregel.removeHistory()
without a parameter.
pregel.removeHistory();
AQL integration
When the graph processing subsystem finishes executing an algorithm, the
results can either be written back into documents (using store: true
as a parameter)
or kept in memory only (using store: false
). If the data is persisted,
then you can query the documents normally to get access to the results.
If you do not want to store results, then they are only held temporarily,
until you call the cancel()
method, or their time to live (customizable via
the ttl
parameter) is exceeded. The in-memory results can be accessed via the
PREGEL_RESULT()
AQL function. If the results are stored in documents, they
are not queryable by PREGEL_RESULT()
.
The result field names depend on the algorithm in both cases.
For example, you might want to query only nodes with the highest rank from the result set of a PageRank execution:
FOR v IN PREGEL_RESULT(<jobId>)
FILTER v.result >= 0.01
RETURN v._key
By default, the PREGEL_RESULT()
AQL function returns the _key
of each
vertex plus the result of the computation. In case the computation was done for
vertices from different vertex collection, just the _key
values may not be
sufficient to tell vertices from different collections apart. In this case,
PREGEL_RESULT()
can be given a second parameter withId
, which makes it
return the _id
values of the vertices as well:
FOR v IN PREGEL_RESULT(<jobId>, true)
FILTER v.result >= 0.01
RETURN v._id
Algorithm Parameters
There are a number of general parameters which apply to almost all algorithms:
store
(bool): Defaults totrue
. If enabled, the Pregel engine writes results back to the database. If the value isfalse
, then you can query the results withPREGEL_RESULT()
in AQL. See AQL integration.maxGSS
(number): Maximum number of global iterations for this algorithmparallelism
(number): Number of parallel threads to use per worker. Does not influence the number of threads used to load or store data from the database (this depends on the number of shards).resultField
(string): Most algorithms use this as attribute name for the result. Some use it as prefix for multiple result attributes. Defaults to"result"
.shardKeyAttribute
(string): shard key that edge collections are sharded after (default:"vertex"
)ttl
(number): The time to live (TTL) defines for how long (in seconds) the Pregel run is kept in memory after it finished with statesdone
,error
orfatal error
. Defaults to 600.
Limits
Pregel algorithms in ArangoDB store temporary vertex and edge data in main memory by default. For large datasets, this can cause problems, as servers may run out of memory while loading the data.
Parts of the Pregel temporary results (aggregated messages) may also be stored in the main memory. That means, if algorithms need to store a lot of result messages temporarily, they may consume a lot of the main memory.
In general, it is also recommended to set the store
attribute of Pregel jobs
to true
, to make jobs write the results back to disk and not just hold them
in the main memory. This way, the results are removed from the main memory once
a Pregel job completes.
If the store
attribute is explicitly set to false
, result sets of completed
Pregel runs are not removed from main memory until you explicitly discard
them by calling the cancel()
method (or shutting down the server).