1
0
Fork 0
mirror of https://github.com/ethauvin/kobalt.git synced 2025-04-26 16:28:12 -07:00

New graph engine.

This commit is contained in:
Cedric Beust 2016-04-17 09:56:54 -07:00
parent c9e595a3a0
commit 67c344931a
3 changed files with 288 additions and 29 deletions

View file

@ -0,0 +1,242 @@
package com.beust.kobalt.internal
import com.beust.kobalt.KobaltException
import com.beust.kobalt.misc.NamedThreadFactory
import com.beust.kobalt.misc.error
import com.beust.kobalt.misc.log
import com.google.common.collect.HashMultimap
import org.testng.Assert
import java.lang.reflect.InvocationTargetException
import java.util.*
import java.util.concurrent.*
class Node<T>(val value: T) {
override fun hashCode() = value!!.hashCode()
override fun equals(other: Any?) : Boolean {
val result = if (other is Node<*>) other.value == value else false
return result
}
override fun toString() = value.toString()
}
class DG<T> {
val VERBOSE = 1
val values : Collection<T> get() = nodes.map { it.value }
internal val nodes = hashSetOf<Node<T>>()
private val dependedUpon = HashMultimap.create<Node<T>, Node<T>>()
private val dependingOn = HashMultimap.create<Node<T>, Node<T>>()
fun addNode(t: T) = synchronized(nodes) {
nodes.add(Node(t))
}
fun removeNode(t: T) = synchronized(nodes) {
log(VERBOSE, " Removing $t")
Node(t).let { node ->
nodes.remove(node)
dependingOn.removeAll(node)
val set = dependedUpon.keySet()
val toReplace = arrayListOf<Pair<Node<T>, Collection<Node<T>>>>()
set.forEach { du ->
val l = ArrayList(dependedUpon[du])
l.remove(node)
toReplace.add(Pair(du, l))
}
toReplace.forEach {
dependedUpon.replaceValues(it.first, it.second)
}
}
}
/**
* Make "from" depend on "to" ("from" is no longer free).
*/
fun addEdge(from: T, to: T) {
val fromNode = Node(from)
nodes.add(fromNode)
val toNode = Node(to)
nodes.add(Node(to))
dependingOn.put(toNode, fromNode)
dependedUpon.put(fromNode, toNode)
}
val freeNodes: Set<T>
get() {
val nonFree = hashSetOf<T>()
synchronized(nodes) {
nodes.forEach {
val du = dependedUpon[it]
if (du != null && du.size > 0) {
nonFree.add(it.value)
}
}
val result = nodes.map { it.value }.filter { !nonFree.contains(it) }.toHashSet()
return result
}
}
fun dump() : String {
val result = StringBuffer()
result.append("************ Graph dump ***************\n")
val free = arrayListOf<Node<T>>()
nodes.forEach { node ->
val d = dependedUpon.get(node)
if (d == null || d.isEmpty()) {
free.add(node)
}
}
result.append("All nodes: $values\n").append("Free nodes: $free").append("\nDependent nodes:\n")
nodes.forEach {
val deps = dependedUpon.get(it)
if (! deps.isEmpty()) {
result.append(" $it -> $deps\n")
}
}
return result.toString()
}
}
interface IWorker2<T> : Callable<TaskResult2<T>> {
/**
* @return list of tasks this worker is working on.
*/
// val tasks : List<T>
/**
* @return the priority of this task.
*/
val priority : Int
}
interface IThreadWorkerFactory2<T> {
/**
* Creates {@code IWorker} for specified set of tasks. It is not necessary that
* number of workers returned be same as number of tasks entered.
*
* @param nodes tasks that need to be executed
* @return list of workers
*/
fun createWorkers(nodes: Collection<T>) : List<IWorker2<T>>
}
class DGExecutor<T>(val graph : DG<T>, val factory: IThreadWorkerFactory2<T>) {
val executor = Executors.newFixedThreadPool(5, NamedThreadFactory("DynamicGraphExecutor"))
val completion = ExecutorCompletionService<TaskResult2<T>>(executor)
fun run() : Int {
try {
return run2()
} finally {
executor.shutdown()
}
}
private fun run2() : Int {
var running = 0
var gotError = false
val nodesRun = hashSetOf<T>()
var newFreeNodes = HashSet<T>(graph.freeNodes)
while (! gotError && running > 0 || newFreeNodes.size > 0) {
nodesRun.addAll(newFreeNodes)
val callables : List<IWorker2<T>> = factory.createWorkers(newFreeNodes)
callables.forEach { completion.submit(it) }
running += callables.size
try {
val future = completion.take()
val taskResult = future.get(2, TimeUnit.SECONDS)
running--
if (taskResult.success) {
nodesRun.add(taskResult.value)
log(1, "Task succeeded: $taskResult")
graph.removeNode(taskResult.value)
newFreeNodes.clear()
newFreeNodes.addAll(graph.freeNodes.minus(nodesRun))
} else {
log(1, "Task failed: $taskResult")
gotError = true
}
} catch(ex: TimeoutException) {
log(2, "Time out")
} catch(ex: Exception) {
val ite = ex.cause
if (ite is InvocationTargetException) {
if (ite.targetException is KobaltException) {
throw (ex.cause as InvocationTargetException).targetException
} else {
error("Error: ${ite.cause?.message}", ite.cause)
gotError = true
}
} else {
error("Error: ${ex.message}", ex)
gotError = true
}
}
}
return if (gotError) 1 else 0
}
}
fun main(argv: Array<String>) {
val dg = DG<String>().apply {
// a -> b
// b -> c, d
// e
addEdge("a", "b")
addEdge("b", "c")
addEdge("b", "d")
addNode("e")
}
val factory = object : IThreadWorkerFactory2<String> {
override fun createWorkers(nodes: Collection<String>): List<IWorker2<String>> {
return nodes.map {
object: IWorker2<String> {
override fun call(): TaskResult2<String>? {
log(1, " Running worker $it")
return TaskResult2(true, null, it)
}
override val priority: Int get() = 0
}
}
}
}
DGExecutor(dg, factory).run()
DG<String>().apply {
// a -> b
// b -> c, d
// e
// Order should be: [c,d,e] [b] [a]
addEdge("a", "b")
addEdge("b", "c")
addEdge("b", "d")
addNode("e")
log(VERBOSE, dump())
Assert.assertEquals(freeNodes, setOf("c", "d", "e"))
removeNode("c")
log(VERBOSE, dump())
Assert.assertEquals(freeNodes, setOf("d", "e"))
removeNode("d")
log(VERBOSE, dump())
Assert.assertEquals(freeNodes, setOf("b", "e"))
removeNode("e")
log(VERBOSE, dump())
Assert.assertEquals(freeNodes, setOf("b"))
removeNode("b")
log(VERBOSE, dump())
Assert.assertEquals(freeNodes, setOf("a"))
removeNode("a")
log(VERBOSE, dump())
Assert.assertTrue(freeNodes.isEmpty())
Assert.assertTrue(nodes.isEmpty())
}
}

View file

@ -103,12 +103,12 @@ public class TaskManager @Inject constructor(val args: Args,
//
log(2, "About to run graph:\n ${graph.dump()} ")
val factory = object : IThreadWorkerFactory<PluginTask> {
override fun createWorkers(nodes: List<PluginTask>)
val factory = object : IThreadWorkerFactory2<PluginTask> {
override fun createWorkers(nodes: Collection<PluginTask>)
= nodes.map { TaskWorker(listOf(it), args.dryRun, messages) }
}
val executor = DynamicGraphExecutor(graph, factory)
val executor = DGExecutor(graph, factory)
val thisResult = executor.run()
if (thisResult != 0) {
log(2, "Marking project ${project.name} as failed")
@ -129,8 +129,8 @@ public class TaskManager @Inject constructor(val args: Args,
alwaysRunAfter: TreeMultimap<String, String>,
toName: (T) -> String,
accept: (T) -> Boolean):
DynamicGraph<T> {
val graph = DynamicGraph<T>()
DG<T> {
val graph = DG<T>()
taskNames.forEach { taskName ->
val ti = TaskInfo(taskName)
if (!dependencies.keys().contains(ti.taskName)) {
@ -180,25 +180,30 @@ public class TaskManager @Inject constructor(val args: Args,
}
//
// Add all the runAfter nodes if applicable
// runAfter nodes are run only if they are explicitly requested
//
graph.nodes.forEach { node ->
val ra = runAfter[toName(node)]
ra.forEach { o ->
dependencies[o].forEach {
if (o != null) {
graph.addEdge(it, node)
arrayListOf<T>().let { allNodes ->
allNodes.addAll(graph.values)
allNodes.forEach { node ->
val nodeName = toName(node)
if (taskNames.contains(nodeName)) {
val ra = runAfter[nodeName]
ra?.forEach { o ->
dependencies[o]?.forEach {
if (taskNames.contains(toName(it))) {
graph.addEdge(node, it)
}
}
}
}
}
println("RA: $ra")
}
//
// If any of the nodes in the graph has an "alwaysRunAfter", add that edge too
//
val allNodes = arrayListOf<T>()
allNodes.addAll(graph.nodes)
arrayListOf<T>().let { allNodes ->
allNodes.addAll(graph.values)
allNodes.forEach { node ->
val ra = alwaysRunAfter[toName(node)]
ra?.forEach { o ->
@ -211,7 +216,8 @@ public class TaskManager @Inject constructor(val args: Args,
}
}
}
}
println("@@@ " + graph.dump())
return graph
}
@ -381,7 +387,7 @@ public class TaskManager @Inject constructor(val args: Args,
}
class TaskWorker(val tasks: List<PluginTask>, val dryRun: Boolean, val messages: MutableList<String>)
: IWorker<PluginTask> {
: IWorker2<PluginTask> {
override fun call() : TaskResult2<PluginTask> {
if (tasks.size > 0) {

View file

@ -11,14 +11,20 @@ import org.testng.annotations.Test
@Guice(modules = arrayOf(TestModule::class))
class TaskManagerTest @Inject constructor(val taskManager: TaskManager) {
class DryRunGraphExecutor<T>(val graph: DynamicGraph<T>) {
class DryRunGraphExecutor<T>(val graph: DG<T>) {
fun run() : List<T> {
val result = arrayListOf<T>()
while (graph.freeNodes.size > 0) {
var freeNodes = graph.freeNodes
while (freeNodes.size > 0) {
val toRemove = arrayListOf<T>()
graph.freeNodes.forEach {
result.add(it)
graph.setStatus(it, DynamicGraph.Status.FINISHED)
toRemove.add(it)
}
toRemove.forEach {
graph.removeNode(it)
}
freeNodes = graph.freeNodes
}
return result
}
@ -29,13 +35,14 @@ class TaskManagerTest @Inject constructor(val taskManager: TaskManager) {
put("assemble", "compile")
}
val runAfter = TreeMultimap.create<String, String>().apply {
put("clean", "compile")
put("compile", "clean")
put("postCompile", "compile")
}
val alwaysRunAfter = TreeMultimap.create<String, String>().apply {
put("clean", "copyVersion")
}
val dependencies = TreeMultimap.create<String, String>().apply {
listOf("assemble", "compile", "clean").forEach {
listOf("assemble", "compile", "clean", "copyVersion", "postCompile").forEach {
put(it, it)
}
}
@ -48,10 +55,14 @@ class TaskManagerTest @Inject constructor(val taskManager: TaskManager) {
@Test
fun graphTest() {
KobaltLogger.LOG_LEVEL = 3
Assert.assertEquals(runTasks(listOf("clean")), listOf("clean", "copyVersion"))
Assert.assertEquals(runTasks(listOf("postCompile")), listOf("postCompile"))
Assert.assertEquals(runTasks(listOf("compile")), listOf("compile"))
Assert.assertEquals(runTasks(listOf("compile", "postCompile")), listOf("compile", "postCompile"))
Assert.assertEquals(runTasks(listOf("clean")), listOf("clean", "copyVersion"))
Assert.assertEquals(runTasks(listOf("clean", "compile")), listOf("clean", "compile", "copyVersion"))
Assert.assertEquals(runTasks(listOf("assemble")), listOf("compile", "assemble"))
Assert.assertEquals(runTasks(listOf("clean", "assemble")), listOf("clean", "compile", "assemble"))
Assert.assertEquals(runTasks(listOf("clean", "assemble")), listOf("clean", "compile", "assemble",
"copyVersion"))
}
}