1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
// SPDX-FileCopyrightText: 2023 Thomas Kramer <code@tkramer.ch>
//
// SPDX-License-Identifier: AGPL-3.0-or-later

//! Graph levelization based on an operator formulation.
//! Compute the level of all graph nodes in a directed acyclic graph.
//! The level of a node is the minimum distance to a node without incoming edges with each edge having a distance of `1`.

use std::{
    marker::PhantomData,
    sync::atomic::{AtomicU32, Ordering},
};

use petgraph::{
    data::DataMap,
    visit::{Data, EdgeRef, GraphBase, IntoEdgesDirected},
};

use pargraph::{local_view::LocalGraphView, worklists::WorklistPush, ReadonlyOperator};

#[derive(Clone, Default)]
pub struct LevelizeOp<N> {
    _node_data_type: PhantomData<N>,
}

/// Get a reference to an atomic variable which represents the level of a node.
/// Used as an indirection to make the levelization algorithm more generic.
pub trait AtomicNodeLevel {
    fn node_level(&self) -> &AtomicU32;
}

impl<G, N> ReadonlyOperator<G> for LevelizeOp<N>
where
    G: GraphBase + Data<NodeWeight = N> + IntoEdgesDirected + DataMap,
    N: AtomicNodeLevel,
{
    type WorkItem = G::NodeId;

    fn op(
        &self,
        _work_item: Self::WorkItem,
        local_view: LocalGraphView<&G>,
        mut worklist: impl WorklistPush<G::NodeId>,
    ) {
        let active_node = local_view.active_node();

        let local_view = &local_view;

        // Compute the current level from the levels of the incoming nodes.
        let level: u32 = local_view
            .edges_directed(active_node, petgraph::Direction::Incoming)
            .map(|e| {
                let n = e.source();
                local_view
                    .node_weight(n)
                    .expect("node has no weight")
                    .node_level()
                    .load(Ordering::Relaxed)
            })
            .map(|l| l.min(u32::MAX - 1) + 1) // Increment level and avoid overflow with unitialized level values.
            .min()
            .unwrap_or(u32::MAX);

        // Atomically store the level.
        local_view
            .node_weight(active_node)
            .expect("active node has no weight")
            .node_level()
            .fetch_min(level, Ordering::Relaxed);

        // Create new activities.
        local_view
            .edges_directed(active_node, petgraph::Direction::Outgoing)
            .for_each(|e| worklist.push(e.target()));
    }
}