2
0
Fork 0
mirror of https://github.com/ethauvin/rife2.git synced 2025-05-01 11:08:11 -07:00

Workflow refactorings, added informational events, added utility methods to wait for work conditions.

This commit is contained in:
Geert Bevin 2023-01-20 09:25:47 -05:00
parent ee0b02c186
commit 9d04473685
7 changed files with 190 additions and 30 deletions

View file

@ -10,7 +10,7 @@ import rife.continuations.exceptions.ContinuationsNotActiveException;
* Work can be executed in a {@link Workflow}. * Work can be executed in a {@link Workflow}.
* <p>Their execution will be done in a thread by invoking the * <p>Their execution will be done in a thread by invoking the
* {@link #execute} method on a new instance of the work class. * {@link #execute} method on a new instance of the work class.
* <p>Afterwards, work can suspend its execution by waiting for particular * <p>Afterwards, work can suspend its execution by pausing for particular
* event types. The thread will stop executing this work and no system resources * event types. The thread will stop executing this work and no system resources
* will be used except for the memory required to maintain the state of the * will be used except for the memory required to maintain the state of the
* suspended work instance. * suspended work instance.
@ -31,7 +31,7 @@ public interface Work {
void execute(Workflow workflow); void execute(Workflow workflow);
/** /**
* Wait for a particular event type to be triggered in the workflow. * Pause until a particular event type is triggered in the workflow.
* <p>When an event is triggered with a suitable type, is will be returned * <p>When an event is triggered with a suitable type, is will be returned
* through this method call. * through this method call.
* *
@ -39,7 +39,7 @@ public interface Work {
* @return the event that woke up the work * @return the event that woke up the work
* @since 1.0 * @since 1.0
*/ */
default Event waitForEvent(Object type) { default Event pauseForEvent(Object type) {
// this should not be triggered, since bytecode rewriting will replace this // this should not be triggered, since bytecode rewriting will replace this
// method call with the appropriate logic // method call with the appropriate logic
throw new ContinuationsNotActiveException(); throw new ContinuationsNotActiveException();

View file

@ -6,6 +6,8 @@ package rife.workflow;
import java.util.*; import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.*;
import rife.continuations.*; import rife.continuations.*;
import rife.continuations.basic.*; import rife.continuations.basic.*;
@ -13,7 +15,7 @@ import rife.ioc.HierarchicalProperties;
import rife.workflow.config.ContinuationInstrument; import rife.workflow.config.ContinuationInstrument;
/** /**
* Runs work and dispatches events to work that is waiting for it. * Runs work and dispatches events to work that is paused.
* <p>Note that this workflow executes the work, but doesn't create a new * <p>Note that this workflow executes the work, but doesn't create a new
* thread for itself. When a workflow is used, you should take the * thread for itself. When a workflow is used, you should take the
* necessary steps to keep the application running for as long as you need the * necessary steps to keep the application running for as long as you need the
@ -31,6 +33,10 @@ public class Workflow {
private final ConcurrentMap<Object, Set<String>> eventsMapping_; private final ConcurrentMap<Object, Set<String>> eventsMapping_;
private final ConcurrentMap<Object, Queue<Event>> pendingEvents_; private final ConcurrentMap<Object, Queue<Event>> pendingEvents_;
private final Set<EventListener> listeners_; private final Set<EventListener> listeners_;
private final Lock workLock_ = new ReentrantLock();
private final Condition workFinished_ = workLock_.newCondition();
private final Condition workPaused_ = workLock_.newCondition();
private final LongAdder activeWorkCount_ = new LongAdder();
/** /**
* Creates a new workflow instance with a cached thread pool. * Creates a new workflow instance with a cached thread pool.
@ -99,7 +105,11 @@ public class Workflow {
public void start(final Class<? extends Work> klass) { public void start(final Class<? extends Work> klass) {
workExecutor_.submit(() -> { workExecutor_.submit(() -> {
try { try {
activeWorkCount_.increment();
runner_.start(klass); runner_.start(klass);
activeWorkCount_.decrement();
signalWhenAllWorkFinished();
} catch (Throwable e) { } catch (Throwable e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
@ -115,13 +125,80 @@ public class Workflow {
public void start(Work work) { public void start(Work work) {
workExecutor_.submit(() -> { workExecutor_.submit(() -> {
try { try {
activeWorkCount_.increment();
runner_.start(work); runner_.start(work);
activeWorkCount_.decrement();
signalWhenAllWorkFinished();
} catch (Throwable e) { } catch (Throwable e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
}); });
} }
private void signalWhenAllWorkFinished() {
if (activeWorkCount_.sum() == 0) {
workLock_.lock();
try {
workFinished_.signalAll();
} finally {
workLock_.unlock();
}
}
}
private void signalWhenWorkIsPaused() {
if (!eventsMapping_.isEmpty()) {
workLock_.lock();
try {
workPaused_.signalAll();
} finally {
workLock_.unlock();
}
}
}
/**
* Convenience method that informs about an event in a workflow.
*
* @param type the type of the event
* @see #inform(Object, Object)
* @see #inform(Event)
* @since 1.0
*/
public void inform(Object type) {
inform(new Event(type, null));
}
/**
* Convenience method that informs about an event in a workflow with
* associated data.
*
* @param type the type of the event
* @param data the data that will be sent with the event
* @see #inform(Object)
* @see #inform(Event)
* @since 1.0
*/
public void inform(Object type, Object data) {
inform(new Event(type, data));
}
/**
* Informs about an event that wakes up work if it is paused for
* the event type.
* <p>If events are informed about and no work is ready to consume them,
* they will be lost. This is different from events being triggered.
*
* @param event the event
* @see #trigger(Object)
* @see #trigger(Object, Object)
* @since 1.0
*/
public void inform(final Event event) {
handleEvent(event, false);
}
/** /**
* Convenience method that triggers an event in a workflow. * Convenience method that triggers an event in a workflow.
* *
@ -149,10 +226,10 @@ public class Workflow {
} }
/** /**
* Triggers an event that wakes up work that is waiting for the event * Triggers an event that wakes up work that is paused for the event
* type. * type.
* <p>If events are triggered, and no work is ready to consume them, they * <p>If events are triggered, and no work is ready to consume them,
* will be queued up until the first available work arrives. * they will be queued up until the first available work arrives.
* *
* @param event the event * @param event the event
* @see #trigger(Object) * @see #trigger(Object)
@ -160,9 +237,13 @@ public class Workflow {
* @since 1.0 * @since 1.0
*/ */
public void trigger(final Event event) { public void trigger(final Event event) {
handleEvent(event, true);
}
private void handleEvent(final Event event, boolean schedulePending) {
if (null == event) return; if (null == event) return;
// retrieve the continuation IDs of the work that is waiting for // retrieve the continuation IDs of the work that is paused for
// the type of the event // the type of the event
// first obtain the collection for this event's type // first obtain the collection for this event's type
@ -171,21 +252,23 @@ public class Workflow {
if (ids != null) { if (ids != null) {
synchronized (ids) { synchronized (ids) {
ids_to_resume.addAll(ids); ids_to_resume.addAll(ids);
ids.clear(); return null;
} }
} }
return ids; return ids;
}); });
if (ids_to_resume.isEmpty()) { if (ids_to_resume.isEmpty()) {
if (schedulePending) {
// couldn't find any continuations to resume, add the event as pending // couldn't find any continuations to resume, add the event as pending
pendingEvents_.compute(event.getType(), (eventType, events) -> { pendingEvents_.compute(event.getType(), (eventType, events) -> {
if (events == null) events = new ConcurrentLinkedQueue<>(); if (events == null) events = new ConcurrentLinkedQueue<>();
events.add(event); events.add(event);
return events; return events;
}); });
}
} else { } else {
// resume all the continuations that are waiting for the event type // resume all the continuations that are paused for the event type
for (var id : ids_to_resume) { for (var id : ids_to_resume) {
answer(id, event); answer(id, event);
} }
@ -195,6 +278,46 @@ public class Workflow {
listeners_.forEach(listener -> listener.eventTriggered(event)); listeners_.forEach(listener -> listener.eventTriggered(event));
} }
/**
* Causes the calling thread to wait until work is paused for events.
*
* @throws InterruptedException when the current thread is interrupted
* @since 1.0
*/
public void waitForPausedWork()
throws InterruptedException {
workLock_.lock();
try {
if (!eventsMapping_.isEmpty()) {
return;
}
workPaused_.await();
} finally {
workLock_.unlock();
}
}
/**
* Causes the calling thread to wait until no more work is running.
*
* @throws InterruptedException when the current thread is interrupted
* @since 1.0
*/
public void waitForNoWork()
throws InterruptedException {
workLock_.lock();
try {
if (activeWorkCount_.sum() == 0) {
return;
}
workFinished_.await();
} finally {
workLock_.unlock();
}
}
/** /**
* Adds a new event listener. * Adds a new event listener.
* *
@ -257,6 +380,8 @@ public class Workflow {
trigger(pending_event[0]); trigger(pending_event[0]);
} }
signalWhenWorkIsPaused();
return null; return null;
} }
} }

View file

@ -34,7 +34,7 @@ public class ContinuationInstrument implements ContinuationConfigInstrument {
} }
public String getCallMethodName() { public String getCallMethodName() {
return "waitForEvent"; return "pauseForEvent";
} }
public Class getCallMethodReturnType() { public Class getCallMethodReturnType() {

View file

@ -12,9 +12,9 @@ import java.util.concurrent.atomic.LongAdder;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
public class WorkTest { public class TestWorkflow {
@Test @Test
void simple() void testCodependency()
throws Throwable { throws Throwable {
final var one_ended = new CountDownLatch(1); final var one_ended = new CountDownLatch(1);
final var all_ended = new CountDownLatch(3); final var all_ended = new CountDownLatch(3);
@ -30,13 +30,28 @@ public class WorkTest {
} }
}); });
workflow.start(new Work1()); workflow.start(new WorkDep1());
workflow.start(Work2.class); workflow.start(WorkDep2.class);
one_ended.await(); one_ended.await();
workflow.start(new Work2()); workflow.start(new WorkDep2());
all_ended.await(); all_ended.await();
assertEquals(45 + 90 + 145, sum.sum()); assertEquals(45 + 90 + 145, sum.sum());
} }
@Test
void testInform()
throws Throwable {
var wf = new Workflow();
wf.inform(TestEventTypes.TYPE1, 1);
var work = new WorkWaitType1();
wf.start(work);
wf.waitForPausedWork();
wf.inform(TestEventTypes.TYPE1, 2);
wf.waitForNoWork();
assertEquals(2, work.getEvent().getData());
}
} }

View file

@ -7,20 +7,20 @@ package rifeworkflowtests;
import rife.workflow.Work; import rife.workflow.Work;
import rife.workflow.Workflow; import rife.workflow.Workflow;
public class Work1 implements Work { public class WorkDep1 implements Work {
public void execute(Workflow workflow) { public void execute(Workflow workflow) {
workflow.trigger(TestEventTypes.BEGIN); workflow.inform(TestEventTypes.BEGIN);
int count; int count;
var sum = 0; var sum = 0;
for (count = 0; count < 20; ++count) { for (count = 0; count < 20; ++count) {
workflow.trigger(TestEventTypes.TYPE2, count); workflow.trigger(TestEventTypes.TYPE2, count);
var event = waitForEvent(TestEventTypes.TYPE1); var event = pauseForEvent(TestEventTypes.TYPE1);
sum += (Integer) event.getData(); sum += (Integer) event.getData();
} }
workflow.trigger(TestEventTypes.END, sum); workflow.inform(TestEventTypes.END, sum);
} }
} }

View file

@ -7,20 +7,20 @@ package rifeworkflowtests;
import rife.workflow.Work; import rife.workflow.Work;
import rife.workflow.Workflow; import rife.workflow.Workflow;
public class Work2 implements Work { public class WorkDep2 implements Work {
public void execute(Workflow workflow) { public void execute(Workflow workflow) {
workflow.trigger(TestEventTypes.BEGIN); workflow.inform(TestEventTypes.BEGIN);
int count; int count;
var sum = 0; var sum = 0;
for (count = 0; count < 10; ++count) { for (count = 0; count < 10; ++count) {
workflow.trigger(TestEventTypes.TYPE1, count); workflow.trigger(TestEventTypes.TYPE1, count);
var event = waitForEvent(TestEventTypes.TYPE2); var event = pauseForEvent(TestEventTypes.TYPE2);
sum += (Integer) event.getData(); sum += (Integer) event.getData();
} }
workflow.trigger(TestEventTypes.END, sum); workflow.inform(TestEventTypes.END, sum);
} }
} }

View file

@ -0,0 +1,20 @@
/*
* Copyright 2001-2023 Geert Bevin (gbevin[remove] at uwyn dot com)
* Licensed under the Apache License, Version 2.0 (the "License")
*/
package rifeworkflowtests;
import rife.workflow.*;
public class WorkWaitType1 implements Work {
private Event event_;
public void execute(Workflow workflow) {
event_ = pauseForEvent(TestEventTypes.TYPE1);
System.out.println(this + " " + event_);
}
public Event getEvent() {
System.out.println(this + " " + event_);
return event_;
}
}