1
0
Fork 0
mirror of https://github.com/ethauvin/kobalt.git synced 2025-04-26 08:27:12 -07:00

Introducing parallel builds.

This commit is contained in:
Cedric Beust 2016-08-01 22:57:19 -08:00
parent e7f06add24
commit 028e360da1
7 changed files with 382 additions and 239 deletions

View file

@ -53,6 +53,9 @@ class Args {
@Parameter(names = arrayOf("--noIncremental"), description = "Turn off incremental builds")
var noIncremental: Boolean = false
@Parameter(names = arrayOf("--parallel"), description = "Build all the projects in parallel whenever possible")
var parallel: Boolean = false
@Parameter(names = arrayOf("--plugins"), description = "Comma-separated list of plug-in Maven id's")
var pluginIds: String? = null

View file

@ -0,0 +1,160 @@
package com.beust.kobalt.internal
import com.beust.kobalt.api.Project
import com.beust.kobalt.misc.log
import com.google.common.annotations.VisibleForTesting
import com.google.common.collect.ArrayListMultimap
import com.google.common.collect.Multimap
import java.util.*
abstract class BaseProjectRunner {
abstract fun runProjects(taskInfos: List<TaskManager.TaskInfo>, projects: List<Project>)
: TaskManager.RunTargetResult
companion object {
val LOG_LEVEL = TaskManager.LOG_LEVEL
/**
* Create a graph representing the tasks and their dependencies. That graph will then be run
* in topological order.
*
* @taskNames is the list of tasks requested by the user. @nodeMap maps these tasks to the nodes
* we'll be adding to the graph while @toName extracts the name of a node.
*/
@VisibleForTesting
fun <T> createTaskGraph(projectName: String, passedTasks: List<TaskManager.TaskInfo>,
nodeMap: Multimap<String, T>,
dependsOn: Multimap<String, String>,
reverseDependsOn: Multimap<String, String>,
runBefore: Multimap<String, String>,
runAfter: Multimap<String, String>,
alwaysRunAfter: Multimap<String, String>,
toName: (T) -> String,
accept: (T) -> Boolean):
DynamicGraph<T> {
val result = DynamicGraph<T>()
val newToProcess = arrayListOf<T>()
val seen = hashSetOf<String>()
//
// Reverse the always map so that tasks can be looked up.
//
val always = ArrayListMultimap.create<String, String>().apply {
alwaysRunAfter.keySet().forEach { k ->
alwaysRunAfter[k].forEach { v ->
put(v, k)
}
}
}
//
// Keep only the tasks we need to run.
//
val taskInfos = passedTasks.filter {
it.matches(projectName)
}
// The nodes we need to process, initialized with the set of tasks requested by the user.
// As we run the graph and discover dependencies, new nodes get added to @param[newToProcess]. At
// the end of the loop, @param[toProcess] is cleared and all the new nodes get added to it. Then we loop.
val toProcess = ArrayList(taskInfos)
while (toProcess.size > 0) {
/**
* Add an edge from @param from to all its tasks.
*/
fun addEdge(result: DynamicGraph<T>, from: String, to: String, newToProcess: ArrayList<T>, text: String) {
val froms = nodeMap[from]
froms.forEach { f: T ->
nodeMap[to].forEach { t: T ->
log(LOG_LEVEL, " Adding edge ($text) $f -> $t")
result.addEdge(f, t)
newToProcess.add(t)
}
}
}
/**
* Whenever a task is added to the graph, we also add its alwaysRunAfter tasks.
*/
fun processAlways(always: Multimap<String, String>, node: T) {
log(LOG_LEVEL, " Processing always for $node")
always[toName(node)]?.let { to: Collection<String> ->
to.forEach { t ->
nodeMap[t].forEach { from ->
log(LOG_LEVEL, " Adding always edge $from -> $node")
result.addEdge(from, node)
}
}
log(LOG_LEVEL, " ... done processing always for $node")
}
}
log(LOG_LEVEL, " Current batch to process: $toProcess")
//
// Move dependsOn + reverseDependsOn in one multimap called allDepends
//
val allDependsOn = ArrayListMultimap.create<String, String>()
dependsOn.keySet().forEach { key ->
dependsOn[key].forEach { value ->
allDependsOn.put(key, value)
}
}
reverseDependsOn.keySet().forEach { key ->
reverseDependsOn[key].forEach { value ->
allDependsOn.put(value, key)
}
}
//
// Process each node one by one
//
toProcess.forEach { taskInfo ->
val taskName = taskInfo.taskName
log(LOG_LEVEL, " ***** Current node: $taskName")
nodeMap[taskName].forEach {
result.addNode(it)
processAlways(always, it)
}
//
// dependsOn and reverseDependsOn are considered for all tasks, explicit and implicit
//
allDependsOn[taskName].forEach { to ->
addEdge(result, taskName, to, newToProcess, "dependsOn")
}
//
// runBefore and runAfter (task ordering) are only considered for explicit tasks (tasks that were
// explicitly requested by the user)
//
passedTasks.map { it.id }.let { taskNames ->
runBefore[taskName].forEach { from ->
if (taskNames.contains(from)) {
addEdge(result, from, taskName, newToProcess, "runBefore")
}
}
runAfter[taskName].forEach { to ->
if (taskNames.contains(to)) {
addEdge(result, taskName, to, newToProcess, "runAfter")
}
}
}
seen.add(taskName)
}
newToProcess.forEach { processAlways(always, it) }
toProcess.clear()
toProcess.addAll(newToProcess.filter { !seen.contains(toName(it)) }.map { TaskManager.TaskInfo(toName(it)) })
newToProcess.clear()
}
return result
}
}
}

View file

@ -180,8 +180,10 @@ interface IThreadWorkerFactory<T> {
fun createWorkers(nodes: Collection<T>) : List<IWorker<T>>
}
class DynamicGraphExecutor<T>(val graph : DynamicGraph<T>, val factory: IThreadWorkerFactory<T>) {
val executor = Executors.newFixedThreadPool(1, NamedThreadFactory("DynamicGraphExecutor"))
class DynamicGraphExecutor<T>(val graph : DynamicGraph<T>, val factory: IThreadWorkerFactory<T>,
threadCount: Int = 1) {
val executor : ExecutorService
= Executors.newFixedThreadPool(threadCount, NamedThreadFactory("DynamicGraphExecutor"))
val completion = ExecutorCompletionService<TaskResult2<T>>(executor)
fun run() : TaskResult {

View file

@ -0,0 +1,102 @@
package com.beust.kobalt.internal
import com.beust.kobalt.Args
import com.beust.kobalt.AsciiArt
import com.beust.kobalt.TaskResult
import com.beust.kobalt.api.*
import com.beust.kobalt.misc.Strings
import com.beust.kobalt.misc.kobaltError
import com.beust.kobalt.misc.log
import com.google.common.collect.ListMultimap
import com.google.common.collect.TreeMultimap
import java.util.*
/**
* Build the projects in parallel.
*
* The projects are sorted in topological order and then run by the DynamicGraphExecutor in background threads
* wherever appropriate. Inside a project, all the tasks are run sequentially.
*/
class ParallelProjectRunner(val tasksByNames: (Project) -> ListMultimap<String, ITask>,
val dependsOn: TreeMultimap<String, String>,
val reverseDependsOn: TreeMultimap<String, String>, val runBefore: TreeMultimap<String, String>,
val runAfter: TreeMultimap<String, String>,
val alwaysRunAfter: TreeMultimap<String, String>, val args: Args, val pluginInfo: PluginInfo)
: BaseProjectRunner() {
override fun runProjects(taskInfos: List<TaskManager.TaskInfo>, projects: List<Project>)
: TaskManager .RunTargetResult {
var result = TaskResult()
val failedProjects = hashSetOf<String>()
val messages = Collections.synchronizedList(arrayListOf<TaskManager.ProfilerInfo>())
fun runProjectListeners(project: Project, context: KobaltContext, start: Boolean,
status: ProjectBuildStatus = ProjectBuildStatus.SUCCESS) {
context.pluginInfo.buildListeners.forEach {
if (start) it.projectStart(project, context) else it.projectEnd(project, context, status)
}
}
val context = Kobalt.context!!
projects.forEach { project ->
AsciiArt.logBox("Building ${project.name}")
// Does the current project depend on any failed projects?
val fp = project.dependsOn.filter {
failedProjects.contains(it.name)
}.map {
it.name
}
if (fp.size > 0) {
log(2, "Marking project ${project.name} as skipped")
failedProjects.add(project.name)
runProjectListeners(project, context, false, ProjectBuildStatus.SKIPPED)
kobaltError("Not building project ${project.name} since it depends on failed "
+ Strings.pluralize(fp.size, "project")
+ " " + fp.joinToString(","))
} else {
runProjectListeners(project, context, true)
// There can be multiple tasks by the same name (e.g. PackagingPlugin and AndroidPlugin both
// define "install"), so use a multimap
val tasksByNames = tasksByNames(project)
log(3, "Tasks:")
tasksByNames.keys().forEach {
log(3, " $it: " + tasksByNames.get(it))
}
val graph = createTaskGraph(project.name, taskInfos, tasksByNames,
dependsOn, reverseDependsOn, runBefore, runAfter, alwaysRunAfter,
{ task: ITask -> task.name },
{ task: ITask -> task.plugin.accept(project) })
//
// Now that we have a full graph, run it
//
log(2, "About to run graph:\n ${graph.dump()} ")
val factory = object : IThreadWorkerFactory<ITask> {
override fun createWorkers(nodes: Collection<ITask>)
= nodes.map { TaskWorker(listOf(it), args.dryRun, pluginInfo) }
}
val executor = DynamicGraphExecutor(graph, factory, 5)
val thisResult = executor.run()
if (! thisResult.success) {
log(2, "Marking project ${project.name} as failed")
failedProjects.add(project.name)
}
runProjectListeners(project, context, false,
if (thisResult.success) ProjectBuildStatus.SUCCESS else ProjectBuildStatus.FAILED)
if (result.success) {
result = thisResult
}
}
}
return TaskManager.RunTargetResult(result, messages)
}
}

View file

@ -0,0 +1,102 @@
package com.beust.kobalt.internal
import com.beust.kobalt.Args
import com.beust.kobalt.AsciiArt
import com.beust.kobalt.TaskResult
import com.beust.kobalt.api.*
import com.beust.kobalt.misc.Strings
import com.beust.kobalt.misc.kobaltError
import com.beust.kobalt.misc.log
import com.google.common.collect.ListMultimap
import com.google.common.collect.TreeMultimap
import java.util.*
/**
* Build the projects in parallel.
*
* The projects are sorted in topological order and then run by the DynamicGraphExecutor in a single thread.
*/
class SequentialProjectRunner(val tasksByNames: (Project) -> ListMultimap<String, ITask>,
val dependsOn: TreeMultimap<String, String>,
val reverseDependsOn: TreeMultimap<String, String>, val runBefore: TreeMultimap<String, String>,
val runAfter: TreeMultimap<String, String>,
val alwaysRunAfter: TreeMultimap<String, String>, val args: Args, val pluginInfo: PluginInfo)
: BaseProjectRunner() {
override fun runProjects(taskInfos: List<TaskManager.TaskInfo>, projects: List<Project>)
: TaskManager.RunTargetResult {
var result = TaskResult()
val failedProjects = hashSetOf<String>()
val messages = Collections.synchronizedList(arrayListOf<TaskManager.ProfilerInfo>())
fun runProjectListeners(project: Project, context: KobaltContext, start: Boolean,
status: ProjectBuildStatus = ProjectBuildStatus.SUCCESS) {
context.pluginInfo.buildListeners.forEach {
if (start) it.projectStart(project, context) else it.projectEnd(project, context, status)
}
}
val context = Kobalt.context!!
projects.forEach { project ->
AsciiArt.logBox("Building ${project.name}")
// Does the current project depend on any failed projects?
val fp = project.dependsOn.filter {
failedProjects.contains(it.name)
}.map {
it.name
}
if (fp.size > 0) {
log(2, "Marking project ${project.name} as skipped")
failedProjects.add(project.name)
runProjectListeners(project, context, false, ProjectBuildStatus.SKIPPED)
kobaltError("Not building project ${project.name} since it depends on failed "
+ Strings.pluralize(fp.size, "project")
+ " " + fp.joinToString(","))
} else {
runProjectListeners(project, context, true)
// There can be multiple tasks by the same name (e.g. PackagingPlugin and AndroidPlugin both
// define "install"), so use a multimap
val tasksByNames = tasksByNames(project)
log(3, "Tasks:")
tasksByNames.keys().forEach {
log(3, " $it: " + tasksByNames.get(it))
}
val graph = createTaskGraph(project.name, taskInfos, tasksByNames,
dependsOn, reverseDependsOn, runBefore, runAfter, alwaysRunAfter,
{ task: ITask -> task.name },
{ task: ITask -> task.plugin.accept(project) })
//
// Now that we have a full graph, run it
//
log(2, "About to run graph:\n ${graph.dump()} ")
val factory = object : IThreadWorkerFactory<ITask> {
override fun createWorkers(nodes: Collection<ITask>)
= nodes.map { TaskWorker(listOf(it), args.dryRun, pluginInfo) }
}
val executor = DynamicGraphExecutor(graph, factory)
val thisResult = executor.run()
if (! thisResult.success) {
log(2, "Marking project ${project.name} as failed")
failedProjects.add(project.name)
}
runProjectListeners(project, context, false,
if (thisResult.success) ProjectBuildStatus.SUCCESS else ProjectBuildStatus.FAILED)
if (result.success) {
result = thisResult
}
}
}
return TaskManager.RunTargetResult(result, messages)
}
}

View file

@ -4,14 +4,10 @@ import com.beust.kobalt.*
import com.beust.kobalt.api.*
import com.beust.kobalt.api.annotation.IncrementalTask
import com.beust.kobalt.api.annotation.Task
import com.beust.kobalt.misc.Strings
import com.beust.kobalt.misc.Topological
import com.beust.kobalt.misc.kobaltError
import com.beust.kobalt.misc.log
import com.google.common.annotations.VisibleForTesting
import com.google.common.collect.ArrayListMultimap
import com.google.common.collect.ListMultimap
import com.google.common.collect.Multimap
import com.google.common.collect.TreeMultimap
import java.lang.reflect.Method
import java.util.*
@ -28,6 +24,10 @@ class TaskManager @Inject constructor(val args: Args,
private val runAfter = TreeMultimap.create<String, String>()
private val alwaysRunAfter = TreeMultimap.create<String, String>()
companion object {
val LOG_LEVEL = 3
}
/**
* Dependency: task2 depends on task 1.
*/
@ -97,7 +97,12 @@ class TaskManager @Inject constructor(val args: Args,
taskInfos = taskInfos.filter { hasTask(it) }
val projectsToRun = findProjectsToRun(taskInfos, allProjects)
return runProjects(taskInfos, projectsToRun)
val projectRunner =
if (args.parallel) ParallelProjectRunner({ p -> tasksByNames(p) }, dependsOn,
reverseDependsOn, runBefore, runAfter, alwaysRunAfter, args, pluginInfo)
else SequentialProjectRunner({ p -> tasksByNames(p) }, dependsOn,
reverseDependsOn, runBefore, runAfter, alwaysRunAfter, args, pluginInfo)
return projectRunner.runProjects(taskInfos, projectsToRun)
}
/**
@ -127,82 +132,6 @@ class TaskManager @Inject constructor(val args: Args,
class ProfilerInfo(val taskName: String, val durationMillis: Long)
private fun runProjects(taskInfos: List<TaskInfo>, projects: List<Project>) : RunTargetResult {
var result = TaskResult()
val failedProjects = hashSetOf<String>()
val messages = Collections.synchronizedList(arrayListOf<ProfilerInfo>())
fun runProjectListeners(project: Project, context: KobaltContext, start: Boolean,
status: ProjectBuildStatus = ProjectBuildStatus.SUCCESS) {
context.pluginInfo.buildListeners.forEach {
if (start) it.projectStart(project, context) else it.projectEnd(project, context, status)
}
}
val context = Kobalt.context!!
projects.forEach { project ->
AsciiArt.logBox("Building ${project.name}")
// Does the current project depend on any failed projects?
val fp = project.dependsOn.filter {
failedProjects.contains(it.name)
}.map {
it.name
}
if (fp.size > 0) {
log(2, "Marking project ${project.name} as skipped")
failedProjects.add(project.name)
runProjectListeners(project, context, false, ProjectBuildStatus.SKIPPED)
kobaltError("Not building project ${project.name} since it depends on failed "
+ Strings.pluralize(fp.size, "project")
+ " " + fp.joinToString(","))
} else {
runProjectListeners(project, context, true)
// There can be multiple tasks by the same name (e.g. PackagingPlugin and AndroidPlugin both
// define "install"), so use a multimap
val tasksByNames = tasksByNames(project)
log(3, "Tasks:")
tasksByNames.keys().forEach {
log(3, " $it: " + tasksByNames.get(it))
}
val graph = createGraph2(project.name, taskInfos, tasksByNames,
dependsOn, reverseDependsOn, runBefore, runAfter, alwaysRunAfter,
{ task: ITask -> task.name },
{ task: ITask -> task.plugin.accept(project) })
//
// Now that we have a full graph, run it
//
log(2, "About to run graph:\n ${graph.dump()} ")
val factory = object : IThreadWorkerFactory<ITask> {
override fun createWorkers(nodes: Collection<ITask>)
= nodes.map { TaskWorker(listOf(it), args.dryRun, pluginInfo) }
}
val executor = DynamicGraphExecutor(graph, factory)
val thisResult = executor.run()
if (! thisResult.success) {
log(2, "Marking project ${project.name} as failed")
failedProjects.add(project.name)
}
runProjectListeners(project, context, false,
if (thisResult.success) ProjectBuildStatus.SUCCESS else ProjectBuildStatus.FAILED)
if (result.success) {
result = thisResult
}
}
}
return RunTargetResult(result, messages)
}
/**
* If the user wants to run a single task on a single project (e.g. "kobalt:assemble"), we need to
* see if that project depends on others and if it does, compile these projects first. This
@ -240,160 +169,6 @@ class TaskManager @Inject constructor(val args: Args,
}
}
val LOG_LEVEL = 3
/**
* Create a graph representing the tasks and their dependencies. That graph will then be run
* in topological order.
*
* @taskNames is the list of tasks requested by the user. @nodeMap maps these tasks to the nodes
* we'll be adding to the graph while @toName extracts the name of a node.
*/
@VisibleForTesting
fun <T> createGraph2(projectName: String, passedTasks: List<TaskInfo>,
nodeMap: Multimap<String, T>,
dependsOn: Multimap<String, String>,
reverseDependsOn: Multimap<String, String>,
runBefore: Multimap<String, String>,
runAfter: Multimap<String, String>,
alwaysRunAfter: Multimap<String, String>,
toName: (T) -> String,
accept: (T) -> Boolean):
DynamicGraph<T> {
val result = DynamicGraph<T>()
val newToProcess = arrayListOf<T>()
val seen = hashSetOf<String>()
// Make each task depend on the previous one, so that command line tasks are executed in the
// order the user specified them
// passedTasks.zip(passedTasks.drop(1)).forEach { pair ->
// nodeMap[pair.first.taskName].forEach { first ->
// nodeMap[pair.second.taskName].forEach { second ->
// result.addEdge(second, first)
// }
// }
// }
//
// Reverse the always map so that tasks can be looked up.
//
val always = ArrayListMultimap.create<String, String>().apply {
alwaysRunAfter.keySet().forEach { k ->
alwaysRunAfter[k].forEach { v ->
put(v, k)
}
}
}
//
// Keep only the tasks we need to run.
//
val taskInfos = passedTasks.filter {
it.matches(projectName)
}
// The nodes we need to process, initialized with the set of tasks requested by the user.
// As we run the graph and discover dependencies, new nodes get added to @param[newToProcess]. At
// the end of the loop, @param[toProcess] is cleared and all the new nodes get added to it. Then we loop.
val toProcess = ArrayList(taskInfos)
while (toProcess.size > 0) {
/**
* Add an edge from @param from to all its tasks.
*/
fun addEdge(result: DynamicGraph<T>, from: String, to: String, newToProcess: ArrayList<T>, text: String) {
val froms = nodeMap[from]
froms.forEach { f: T ->
nodeMap[to].forEach { t: T ->
log(LOG_LEVEL, " Adding edge ($text) $f -> $t")
result.addEdge(f, t)
newToProcess.add(t)
}
}
}
/**
* Whenever a task is added to the graph, we also add its alwaysRunAfter tasks.
*/
fun processAlways(always: Multimap<String, String>, node: T) {
log(LOG_LEVEL, " Processing always for $node")
always[toName(node)]?.let { to: Collection<String> ->
to.forEach { t ->
nodeMap[t].forEach { from ->
log(LOG_LEVEL, " Adding always edge $from -> $node")
result.addEdge(from, node)
}
}
log(LOG_LEVEL, " ... done processing always for $node")
}
}
log(LOG_LEVEL, " Current batch to process: $toProcess")
//
// Move dependsOn + reverseDependsOn in one multimap called allDepends
//
val allDependsOn = ArrayListMultimap.create<String, String>()
dependsOn.keySet().forEach { key ->
dependsOn[key].forEach { value ->
allDependsOn.put(key, value)
}
}
reverseDependsOn.keySet().forEach { key ->
reverseDependsOn[key].forEach { value ->
allDependsOn.put(value, key)
}
}
//
// Process each node one by one
//
toProcess.forEach { taskInfo ->
val taskName = taskInfo.taskName
log(LOG_LEVEL, " ***** Current node: $taskName")
nodeMap[taskName].forEach {
result.addNode(it)
processAlways(always, it)
}
//
// dependsOn and reverseDependsOn are considered for all tasks, explicit and implicit
//
allDependsOn[taskName].forEach { to ->
addEdge(result, taskName, to, newToProcess, "dependsOn")
}
//
// runBefore and runAfter (task ordering) are only considered for explicit tasks (tasks that were
// explicitly requested by the user)
//
passedTasks.map { it.id }.let { taskNames ->
runBefore[taskName].forEach { from ->
if (taskNames.contains(from)) {
addEdge(result, from, taskName, newToProcess, "runBefore")
}
}
runAfter[taskName].forEach { to ->
if (taskNames.contains(to)) {
addEdge(result, taskName, to, newToProcess, "runAfter")
}
}
}
seen.add(taskName)
}
newToProcess.forEach { processAlways(always, it) }
toProcess.clear()
toProcess.addAll(newToProcess.filter { ! seen.contains(toName(it))}.map { TaskInfo(toName(it)) })
newToProcess.clear()
}
return result
}
/////
// Manage the tasks
//