mirror of
https://github.com/ethauvin/kobalt.git
synced 2025-04-26 16:28:12 -07:00
Bug in DynamicGraph.
Was aborting when there were still running nodes.
This commit is contained in:
parent
3ca365a0fa
commit
6bbd9bd943
1 changed files with 9 additions and 8 deletions
|
@ -49,24 +49,24 @@ public class DynamicGraphExecutor<T>(val graph: DynamicGraph<T>,
|
||||||
public fun run() : Int {
|
public fun run() : Int {
|
||||||
var lastResult = TaskResult()
|
var lastResult = TaskResult()
|
||||||
var gotError = false
|
var gotError = false
|
||||||
|
var nodesRunning = 0
|
||||||
while (graph.freeNodes.size > 0 && ! gotError) {
|
while (graph.freeNodes.size > 0 && ! gotError) {
|
||||||
log(3, "Current count: ${graph.nodeCount}")
|
log(3, "Current node count: ${graph.nodeCount}")
|
||||||
synchronized(graph) {
|
synchronized(graph) {
|
||||||
val freeNodes = graph.freeNodes
|
val freeNodes = graph.freeNodes
|
||||||
freeNodes.forEach { graph.setStatus(it, DynamicGraph.Status.RUNNING)}
|
freeNodes.forEach { graph.setStatus(it, DynamicGraph.Status.RUNNING)}
|
||||||
log(3, "submitting free nodes $freeNodes")
|
log(3, " ==> Submitting " + freeNodes)
|
||||||
val callables : List<IWorker<T>> = factory.createWorkers(freeNodes)
|
val callables : List<IWorker<T>> = factory.createWorkers(freeNodes)
|
||||||
callables.forEach { completion.submit(it) }
|
callables.forEach { completion.submit(it) }
|
||||||
var n = callables.size
|
nodesRunning += callables.size
|
||||||
|
|
||||||
// When a callable ends, see if it freed a node. If not, keep looping
|
// When a callable ends, see if it freed a node. If not, keep looping
|
||||||
while (n > 0 && graph.freeNodes.size == 0 && ! gotError) {
|
while (graph.nodesRunning.size > 0 && graph.freeNodes.size == 0 && ! gotError) {
|
||||||
try {
|
try {
|
||||||
val future = completion.take()
|
val future = completion.take()
|
||||||
val taskResult = future.get(2, TimeUnit.SECONDS)
|
val taskResult = future.get(2, TimeUnit.SECONDS)
|
||||||
lastResult = taskResult
|
lastResult = taskResult
|
||||||
log(3, "Received task result $taskResult")
|
log(3, " <== Received task result $taskResult")
|
||||||
n--
|
|
||||||
graph.setStatus(taskResult.value,
|
graph.setStatus(taskResult.value,
|
||||||
if (taskResult.success) {
|
if (taskResult.success) {
|
||||||
DynamicGraph.Status.FINISHED
|
DynamicGraph.Status.FINISHED
|
||||||
|
@ -106,7 +106,7 @@ public class DynamicGraphExecutor<T>(val graph: DynamicGraph<T>,
|
||||||
*/
|
*/
|
||||||
public class DynamicGraph<T> {
|
public class DynamicGraph<T> {
|
||||||
val nodesReady = linkedSetOf<T>()
|
val nodesReady = linkedSetOf<T>()
|
||||||
private val nodesRunning = linkedSetOf<T>()
|
val nodesRunning = linkedSetOf<T>()
|
||||||
private val nodesFinished = linkedSetOf<T>()
|
private val nodesFinished = linkedSetOf<T>()
|
||||||
private val nodesInError = linkedSetOf<T>()
|
private val nodesInError = linkedSetOf<T>()
|
||||||
private val nodesSkipped = linkedSetOf<T>()
|
private val nodesSkipped = linkedSetOf<T>()
|
||||||
|
@ -171,7 +171,7 @@ public class DynamicGraph<T> {
|
||||||
// }
|
// }
|
||||||
// }
|
// }
|
||||||
|
|
||||||
log(3, "freeNodes: $result")
|
log(3, " freeNodes: $result")
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -264,6 +264,7 @@ public class DynamicGraph<T> {
|
||||||
|
|
||||||
fun dump(nodes: Collection<T>) : String {
|
fun dump(nodes: Collection<T>) : String {
|
||||||
val result = StringBuffer()
|
val result = StringBuffer()
|
||||||
|
result.append("************ Graph dump ***************\n")
|
||||||
val free = arrayListOf<T>()
|
val free = arrayListOf<T>()
|
||||||
nodes.forEach { node ->
|
nodes.forEach { node ->
|
||||||
val d = dependedUpon.get(node)
|
val d = dependedUpon.get(node)
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue