Skip to content

[GLE-10737] fix(algo): fix tg_maxflow algorithm (created by autoport pipeline from GLE-10323 | gsql-graph-algorithms#179); #181

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 12, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
265 changes: 106 additions & 159 deletions algorithms/Path/maxflow/tg_maxflow.gsql
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ CREATE QUERY tg_maxflow(VERTEX source, VERTEX sink, Set<STRING> v_type, SET<STRI
BOOL print_results = TRUE, BOOL display_edges = TRUE, STRING file_path = "") SYNTAX V1 {

/*
First Author: <First Author Name>
First Commit Date: <First Commit Date>
First Author: David Zelong Fan
First Commit Date: Jun 6, 2025

Recent Author: <Recent Commit Author Name>
Recent Commit Date: <Recent Commit Date>
Expand Down Expand Up @@ -50,178 +50,125 @@ CREATE QUERY tg_maxflow(VERTEX source, VERTEX sink, Set<STRING> v_type, SET<STRI
file to write CSV output to
*/

TYPEDEF TUPLE<INT prev_flow, BOOL is_forward, VERTEX prev> tb_node;
GroupByAccum<VERTEX source, VERTEX targ, SumAccum<FLOAT> flow> @@group_by_flow_accum;
SetAccum<VERTEX> @@curr_set;
// variables used in max flow computation
GroupByAccum<VERTEX v_from, VERTEX v_to, SumAccum<INT> flow> @@flow_gb;
GroupByAccum<VERTEX v_from, VERTEX v_to, SumAccum<INT> flow> @@residual_gb;
MinAccum<INT> @@path_flow;

// variables used in bfs
ListAccum<VERTEX> @path_list;
OrAccum @end_point;
ListAccum<ListAccum<VERTEX>> @@total_path_list;
ListAccum<VERTEX> @@shortest_path;

// variables used for printing
FILE f(file_path);
SetAccum<EDGE> @@edges_set;
HeapAccum<tb_node>(1, prev_flow DESC) @trace_back_heap;

MaxAccum<FLOAT> @@max_cap_threshold;
SumAccum<FLOAT> @@sum_max_flow = 0;
MinAccum<FLOAT> @@min_flow;
OrAccum @or_is_visited, @@or_is_found;
BOOL minimum_reached = FALSE;
FILE f(file_path);
@@max_cap_threshold = min_flow_threshhold;

IF cap_type NOT IN ("UINT", "INT", "FLOAT", "DOUBLE") THEN
PRINT "weight_type must be UINT, INT, FLOAT, or DOUBLE" AS errMsg;
RETURN;
END;

##### Initialize #####
init = {v_type};
init = SELECT s
FROM init:s - (e_type_set:e) - v_type:t
ACCUM
FLOAT fl = 0,
CASE cap_type
WHEN "UINT" THEN
fl = e.getAttr(cap_attr, "UINT")
WHEN "INT" THEN
fl = e.getAttr(cap_attr, "INT")
WHEN "FLOAT" THEN
fl = e.getAttr(cap_attr, "FLOAT")
WHEN "DOUBLE" THEN
fl = e.getAttr(cap_attr, "DOUBLE")
END,
@@group_by_flow_accum += (s, t -> 0),
IF s == source THEN
@@max_cap_threshold += fl
END;

//used for determining minimum flow of path, s.t. minimum flow > cap_threshold
@@max_cap_threshold = pow(3, float_to_int(log(@@max_cap_threshold)/log(3)));

##### Push one flow at a time until there is residudal graph is disconnected #####
SumAccum<INT> @@sum_max_flow = 0;

// initialize flow and residual graph
all_vertices = {v_type};
all_vertices = SELECT s
FROM all_vertices:s - (e_type_set:e) - v_type:t
ACCUM
FLOAT cap = 0,
CASE cap_type
WHEN "UINT" THEN
cap = e.getAttr(cap_attr, "UINT")
WHEN "INT" THEN
cap = e.getAttr(cap_attr, "INT")
WHEN "FLOAT" THEN
cap = e.getAttr(cap_attr, "FLOAT")
WHEN "DOUBLE" THEN
cap = e.getAttr(cap_attr, "DOUBLE")
END,
@@flow_gb += (s, t -> 0),
@@residual_gb += (s, t -> cap)
;

// mark the target node as true
endset = {sink};
endset = SELECT s
From endset:s
ACCUM s.@end_point = true;

WHILE TRUE DO
//initilize

init = SELECT s
FROM init:s
POST-ACCUM s.@or_is_visited = FALSE,
s.@trace_back_heap = tb_node(GSQL_INT_MIN, FALSE, source);

start = {source};
start = SELECT s
FROM start:s
POST-ACCUM s.@or_is_visited = TRUE;

@@or_is_found = False;

//BFS to find feasible path from source -> sink
WHILE NOT @@or_is_found AND start.size() > 0 DO
forwd = SELECT t
FROM start:s - (e_type_set:e) - v_type:t
WHERE NOT t.@or_is_visited
ACCUM
FLOAT fl = 0,
CASE cap_type
WHEN "UINT" THEN
fl = e.getAttr(cap_attr, "UINT")
WHEN "INT" THEN
fl = e.getAttr(cap_attr, "INT")
WHEN "FLOAT" THEN
fl = e.getAttr(cap_attr, "FLOAT")
WHEN "DOUBLE" THEN
fl = e.getAttr(cap_attr, "DOUBLE")
END,
IF fl - @@group_by_flow_accum.get(s, t).flow >= @@max_cap_threshold THEN
t.@trace_back_heap += tb_node(fl - @@group_by_flow_accum.get(s, t).flow, TRUE, s),
t.@or_is_visited += TRUE,
@@or_is_found += t == sink
END
HAVING t.@or_is_visited;

bacwd = SELECT t
FROM start:s - (reverse_e_type_set) - v_type:t
WHERE NOT t.@or_is_visited
ACCUM
IF @@group_by_flow_accum.get(t, s).flow >= @@max_cap_threshold THEN
t.@trace_back_heap += tb_node(@@group_by_flow_accum.get(t, s).flow, FALSE, s),
t.@or_is_visited += TRUE,
@@or_is_found += t == sink
END
HAVING t.@or_is_visited;

start = forwd UNION bacwd;
END;

//done when residual graph is disconnected
IF NOT @@or_is_found AND minimum_reached THEN
BREAK;
END;

//reduce cap_threshold to look for more path options
IF NOT @@or_is_found THEN
@@max_cap_threshold = float_to_int(@@max_cap_threshold/3);
IF @@max_cap_threshold < min_flow_threshhold THEN
@@max_cap_threshold = min_flow_threshhold;
minimum_reached = TRUE;
END;

CONTINUE;
END;

//find bottleneck
@@curr_set.clear();
@@curr_set += sink;
@@min_flow = GSQL_INT_MAX;
// Run BFS, starting from the initial node
SourceSet = {source};
SourceSet = SELECT s
FROM SourceSet:s
ACCUM s.@path_list = [s];

WHILE NOT @@curr_set.contains(source) DO
start = @@curr_set;
@@curr_set.clear();
start = SELECT s
FROM start:s
POST-ACCUM @@min_flow += s.@trace_back_heap.top().prev_flow,
@@curr_set += s.@trace_back_heap.top().prev;

END;

@@sum_max_flow += @@min_flow;

//traceback to source and update flow vertices
@@curr_set.clear();
@@curr_set += sink;
WHILE NOT @@curr_set.contains(source) DO
start = @@curr_set;
@@curr_set.clear();
start = SELECT s
FROM start:s
POST-ACCUM
@@curr_set += s.@trace_back_heap.top().prev,
CASE
WHEN s.@trace_back_heap.top().is_forward THEN
@@group_by_flow_accum += (s.@trace_back_heap.top().prev, s -> @@min_flow)
ELSE
@@group_by_flow_accum += (s, s.@trace_back_heap.top().prev -> -@@min_flow)
END;
END;
END;
WHILE SourceSet.size() > 0 and @@total_path_list.size() == 0 DO
SourceSet = SELECT t
FROM SourceSet:s -((reverse_e_type_set|e_type_set):e)- :t
WHERE @@residual_gb.get(s, t).flow > min_flow_threshhold AND t.@path_list.size() == 0
ACCUM
// choose any path for tie-breaking
IF s.@path_list.size() > 0 THEN
IF t.@end_point == true THEN
@@total_path_list += s.@path_list + [t]
ELSE
t.@path_list = s.@path_list + [t]
END
END
;
END; // end of BFS

// if no augmenting path is found, break because we've reached max flow
if @@total_path_list.size() == 0 THEN
break;
END;

@@shortest_path = @@total_path_list.get(0);

@@path_flow = GSQL_INT_MAX;

// see how much flow we can send
FOREACH i IN RANGE[0, @@shortest_path.size() - 2] DO
@@path_flow += @@residual_gb.get(@@shortest_path.get(i), @@shortest_path.get(i + 1)).flow;
END;

// send the flow and update the residual graph
FOREACH i IN RANGE[0, @@shortest_path.size() - 2] DO
@@flow_gb += (@@shortest_path.get(i), @@shortest_path.get(i + 1) -> @@path_flow);
@@residual_gb += (@@shortest_path.get(i + 1), @@shortest_path.get(i) -> @@path_flow);
@@residual_gb += (@@shortest_path.get(i), @@shortest_path.get(i + 1) -> -@@path_flow);
END;
@@sum_max_flow += @@path_flow;

// reset and clear
@@total_path_list.clear();
all_vertices = SELECT s
FROM all_vertices:s
POST-ACCUM s.@path_list.clear();

END;

##### Output #####
IF file_path != "" THEN
f.println("Maxflow: " + to_string(@@sum_max_flow));
f.println("From","To","Flow");
END;
start = {source};
WHILE start.size() != 0 DO
start = SELECT t
FROM start:s - (e_type_set:e) - v_type:t
WHERE @@group_by_flow_accum.get(s,t).flow >= min_flow_threshhold
END;
SourceSet = {source};
WHILE SourceSet.size() != 0 DO
SourceSet = SELECT t
FROM SourceSet:s - (e_type_set:e) - v_type:t
WHERE @@flow_gb.get(s,t).flow > min_flow_threshhold
ACCUM
IF print_results THEN
@@edges_set += e
END,
@@edges_set += e
END,
IF file_path != "" THEN
f.println(s, t, @@group_by_flow_accum.get(s,t).flow)
END;
f.println(s, t, @@flow_gb.get(s,t).flow)
END
;
END;

IF print_results THEN
PRINT @@sum_max_flow;
IF display_edges THEN
PRINT @@edges_set;
END;
END;
}
}
Loading