From 67c344931ac3e6883372d86e96e00e4d39759c17 Mon Sep 17 00:00:00 2001 From: Cedric Beust Date: Sun, 17 Apr 2016 09:56:54 -0700 Subject: [PATCH] New graph engine. --- .../kotlin/com/beust/kobalt/internal/DG.kt | 242 ++++++++++++++++++ .../com/beust/kobalt/internal/TaskManager.kt | 50 ++-- .../beust/kobalt/internal/TaskManagerTest.kt | 25 +- 3 files changed, 288 insertions(+), 29 deletions(-) create mode 100644 modules/kobalt-plugin-api/src/main/kotlin/com/beust/kobalt/internal/DG.kt diff --git a/modules/kobalt-plugin-api/src/main/kotlin/com/beust/kobalt/internal/DG.kt b/modules/kobalt-plugin-api/src/main/kotlin/com/beust/kobalt/internal/DG.kt new file mode 100644 index 00000000..9833ee3b --- /dev/null +++ b/modules/kobalt-plugin-api/src/main/kotlin/com/beust/kobalt/internal/DG.kt @@ -0,0 +1,242 @@ +package com.beust.kobalt.internal + +import com.beust.kobalt.KobaltException +import com.beust.kobalt.misc.NamedThreadFactory +import com.beust.kobalt.misc.error +import com.beust.kobalt.misc.log +import com.google.common.collect.HashMultimap +import org.testng.Assert +import java.lang.reflect.InvocationTargetException +import java.util.* +import java.util.concurrent.* + +class Node(val value: T) { + override fun hashCode() = value!!.hashCode() + override fun equals(other: Any?) : Boolean { + val result = if (other is Node<*>) other.value == value else false + return result + } + override fun toString() = value.toString() +} + +class DG { + val VERBOSE = 1 + val values : Collection get() = nodes.map { it.value } + internal val nodes = hashSetOf>() + private val dependedUpon = HashMultimap.create, Node>() + private val dependingOn = HashMultimap.create, Node>() + + fun addNode(t: T) = synchronized(nodes) { + nodes.add(Node(t)) + } + + fun removeNode(t: T) = synchronized(nodes) { + log(VERBOSE, " Removing $t") + Node(t).let { node -> + nodes.remove(node) + dependingOn.removeAll(node) + val set = dependedUpon.keySet() + val toReplace = arrayListOf, Collection>>>() + set.forEach { du -> + val l = ArrayList(dependedUpon[du]) + l.remove(node) + toReplace.add(Pair(du, l)) + } + toReplace.forEach { + dependedUpon.replaceValues(it.first, it.second) + } + } + } + + /** + * Make "from" depend on "to" ("from" is no longer free). + */ + fun addEdge(from: T, to: T) { + val fromNode = Node(from) + nodes.add(fromNode) + val toNode = Node(to) + nodes.add(Node(to)) + dependingOn.put(toNode, fromNode) + dependedUpon.put(fromNode, toNode) + } + + val freeNodes: Set + get() { + val nonFree = hashSetOf() + synchronized(nodes) { + nodes.forEach { + val du = dependedUpon[it] + if (du != null && du.size > 0) { + nonFree.add(it.value) + } + } + val result = nodes.map { it.value }.filter { !nonFree.contains(it) }.toHashSet() + return result + } + } + + fun dump() : String { + val result = StringBuffer() + result.append("************ Graph dump ***************\n") + val free = arrayListOf>() + nodes.forEach { node -> + val d = dependedUpon.get(node) + if (d == null || d.isEmpty()) { + free.add(node) + } + } + + result.append("All nodes: $values\n").append("Free nodes: $free").append("\nDependent nodes:\n") + nodes.forEach { + val deps = dependedUpon.get(it) + if (! deps.isEmpty()) { + result.append(" $it -> $deps\n") + } + } + return result.toString() + } +} + +interface IWorker2 : Callable> { + /** + * @return list of tasks this worker is working on. + */ + // val tasks : List + + /** + * @return the priority of this task. + */ + val priority : Int +} + +interface IThreadWorkerFactory2 { + + /** + * Creates {@code IWorker} for specified set of tasks. It is not necessary that + * number of workers returned be same as number of tasks entered. + * + * @param nodes tasks that need to be executed + * @return list of workers + */ + fun createWorkers(nodes: Collection) : List> +} + +class DGExecutor(val graph : DG, val factory: IThreadWorkerFactory2) { + val executor = Executors.newFixedThreadPool(5, NamedThreadFactory("DynamicGraphExecutor")) + val completion = ExecutorCompletionService>(executor) + + fun run() : Int { + try { + return run2() + } finally { + executor.shutdown() + } + } + + private fun run2() : Int { + var running = 0 + var gotError = false + val nodesRun = hashSetOf() + var newFreeNodes = HashSet(graph.freeNodes) + while (! gotError && running > 0 || newFreeNodes.size > 0) { + nodesRun.addAll(newFreeNodes) + val callables : List> = factory.createWorkers(newFreeNodes) + callables.forEach { completion.submit(it) } + running += callables.size + + try { + val future = completion.take() + val taskResult = future.get(2, TimeUnit.SECONDS) + running-- + if (taskResult.success) { + nodesRun.add(taskResult.value) + log(1, "Task succeeded: $taskResult") + graph.removeNode(taskResult.value) + newFreeNodes.clear() + newFreeNodes.addAll(graph.freeNodes.minus(nodesRun)) + } else { + log(1, "Task failed: $taskResult") + gotError = true + } + } catch(ex: TimeoutException) { + log(2, "Time out") + } catch(ex: Exception) { + val ite = ex.cause + if (ite is InvocationTargetException) { + if (ite.targetException is KobaltException) { + throw (ex.cause as InvocationTargetException).targetException + } else { + error("Error: ${ite.cause?.message}", ite.cause) + gotError = true + } + } else { + error("Error: ${ex.message}", ex) + gotError = true + } + } + } + return if (gotError) 1 else 0 + } +} + +fun main(argv: Array) { + val dg = DG().apply { + // a -> b + // b -> c, d + // e + addEdge("a", "b") + addEdge("b", "c") + addEdge("b", "d") + addNode("e") + } + val factory = object : IThreadWorkerFactory2 { + override fun createWorkers(nodes: Collection): List> { + return nodes.map { + object: IWorker2 { + override fun call(): TaskResult2? { + log(1, " Running worker $it") + return TaskResult2(true, null, it) + } + + override val priority: Int get() = 0 + } + } + } + } + + DGExecutor(dg, factory).run() + + DG().apply { + // a -> b + // b -> c, d + // e + // Order should be: [c,d,e] [b] [a] + addEdge("a", "b") + addEdge("b", "c") + addEdge("b", "d") + addNode("e") + log(VERBOSE, dump()) + Assert.assertEquals(freeNodes, setOf("c", "d", "e")) + + removeNode("c") + log(VERBOSE, dump()) + Assert.assertEquals(freeNodes, setOf("d", "e")) + + removeNode("d") + log(VERBOSE, dump()) + Assert.assertEquals(freeNodes, setOf("b", "e")) + + removeNode("e") + log(VERBOSE, dump()) + Assert.assertEquals(freeNodes, setOf("b")) + + removeNode("b") + log(VERBOSE, dump()) + Assert.assertEquals(freeNodes, setOf("a")) + + removeNode("a") + log(VERBOSE, dump()) + Assert.assertTrue(freeNodes.isEmpty()) + Assert.assertTrue(nodes.isEmpty()) + } +} diff --git a/modules/kobalt-plugin-api/src/main/kotlin/com/beust/kobalt/internal/TaskManager.kt b/modules/kobalt-plugin-api/src/main/kotlin/com/beust/kobalt/internal/TaskManager.kt index 1bddf7cb..f09f6369 100644 --- a/modules/kobalt-plugin-api/src/main/kotlin/com/beust/kobalt/internal/TaskManager.kt +++ b/modules/kobalt-plugin-api/src/main/kotlin/com/beust/kobalt/internal/TaskManager.kt @@ -103,12 +103,12 @@ public class TaskManager @Inject constructor(val args: Args, // log(2, "About to run graph:\n ${graph.dump()} ") - val factory = object : IThreadWorkerFactory { - override fun createWorkers(nodes: List) + val factory = object : IThreadWorkerFactory2 { + override fun createWorkers(nodes: Collection) = nodes.map { TaskWorker(listOf(it), args.dryRun, messages) } } - val executor = DynamicGraphExecutor(graph, factory) + val executor = DGExecutor(graph, factory) val thisResult = executor.run() if (thisResult != 0) { log(2, "Marking project ${project.name} as failed") @@ -129,8 +129,8 @@ public class TaskManager @Inject constructor(val args: Args, alwaysRunAfter: TreeMultimap, toName: (T) -> String, accept: (T) -> Boolean): - DynamicGraph { - val graph = DynamicGraph() + DG { + val graph = DG() taskNames.forEach { taskName -> val ti = TaskInfo(taskName) if (!dependencies.keys().contains(ti.taskName)) { @@ -180,30 +180,36 @@ public class TaskManager @Inject constructor(val args: Args, } // - // Add all the runAfter nodes if applicable + // runAfter nodes are run only if they are explicitly requested // - graph.nodes.forEach { node -> - val ra = runAfter[toName(node)] - ra.forEach { o -> - dependencies[o].forEach { - if (o != null) { - graph.addEdge(it, node) + arrayListOf().let { allNodes -> + allNodes.addAll(graph.values) + allNodes.forEach { node -> + val nodeName = toName(node) + if (taskNames.contains(nodeName)) { + val ra = runAfter[nodeName] + ra?.forEach { o -> + dependencies[o]?.forEach { + if (taskNames.contains(toName(it))) { + graph.addEdge(node, it) + } + } } } } - println("RA: $ra") } // // If any of the nodes in the graph has an "alwaysRunAfter", add that edge too // - val allNodes = arrayListOf() - allNodes.addAll(graph.nodes) - allNodes.forEach { node -> - val ra = alwaysRunAfter[toName(node)] - ra?.forEach { o -> - dependencies[o]?.forEach { - graph.addEdge(it, node) + arrayListOf().let { allNodes -> + allNodes.addAll(graph.values) + allNodes.forEach { node -> + val ra = alwaysRunAfter[toName(node)] + ra?.forEach { o -> + dependencies[o]?.forEach { + graph.addEdge(it, node) + } } } } @@ -211,7 +217,7 @@ public class TaskManager @Inject constructor(val args: Args, } } } - + println("@@@ " + graph.dump()) return graph } @@ -381,7 +387,7 @@ public class TaskManager @Inject constructor(val args: Args, } class TaskWorker(val tasks: List, val dryRun: Boolean, val messages: MutableList) - : IWorker { + : IWorker2 { override fun call() : TaskResult2 { if (tasks.size > 0) { diff --git a/src/test/kotlin/com/beust/kobalt/internal/TaskManagerTest.kt b/src/test/kotlin/com/beust/kobalt/internal/TaskManagerTest.kt index 0bf98974..dd572ac8 100644 --- a/src/test/kotlin/com/beust/kobalt/internal/TaskManagerTest.kt +++ b/src/test/kotlin/com/beust/kobalt/internal/TaskManagerTest.kt @@ -11,14 +11,20 @@ import org.testng.annotations.Test @Guice(modules = arrayOf(TestModule::class)) class TaskManagerTest @Inject constructor(val taskManager: TaskManager) { - class DryRunGraphExecutor(val graph: DynamicGraph) { + class DryRunGraphExecutor(val graph: DG) { fun run() : List { val result = arrayListOf() - while (graph.freeNodes.size > 0) { + var freeNodes = graph.freeNodes + while (freeNodes.size > 0) { + val toRemove = arrayListOf() graph.freeNodes.forEach { result.add(it) - graph.setStatus(it, DynamicGraph.Status.FINISHED) + toRemove.add(it) } + toRemove.forEach { + graph.removeNode(it) + } + freeNodes = graph.freeNodes } return result } @@ -29,13 +35,14 @@ class TaskManagerTest @Inject constructor(val taskManager: TaskManager) { put("assemble", "compile") } val runAfter = TreeMultimap.create().apply { - put("clean", "compile") + put("compile", "clean") + put("postCompile", "compile") } val alwaysRunAfter = TreeMultimap.create().apply { put("clean", "copyVersion") } val dependencies = TreeMultimap.create().apply { - listOf("assemble", "compile", "clean").forEach { + listOf("assemble", "compile", "clean", "copyVersion", "postCompile").forEach { put(it, it) } } @@ -48,10 +55,14 @@ class TaskManagerTest @Inject constructor(val taskManager: TaskManager) { @Test fun graphTest() { KobaltLogger.LOG_LEVEL = 3 - Assert.assertEquals(runTasks(listOf("clean")), listOf("clean", "copyVersion")) + Assert.assertEquals(runTasks(listOf("postCompile")), listOf("postCompile")) Assert.assertEquals(runTasks(listOf("compile")), listOf("compile")) + Assert.assertEquals(runTasks(listOf("compile", "postCompile")), listOf("compile", "postCompile")) + Assert.assertEquals(runTasks(listOf("clean")), listOf("clean", "copyVersion")) + Assert.assertEquals(runTasks(listOf("clean", "compile")), listOf("clean", "compile", "copyVersion")) Assert.assertEquals(runTasks(listOf("assemble")), listOf("compile", "assemble")) - Assert.assertEquals(runTasks(listOf("clean", "assemble")), listOf("clean", "compile", "assemble")) + Assert.assertEquals(runTasks(listOf("clean", "assemble")), listOf("clean", "compile", "assemble", + "copyVersion")) } }