diff --git a/prometeu-infra/src/main/java/p/studio/utilities/events/EventSubscription.java b/prometeu-infra/src/main/java/p/studio/utilities/events/EventSubscription.java new file mode 100644 index 00000000..0e6c9894 --- /dev/null +++ b/prometeu-infra/src/main/java/p/studio/utilities/events/EventSubscription.java @@ -0,0 +1,5 @@ +package p.studio.utilities.events; + +public interface EventSubscription { + void unsubscribe(); +} diff --git a/prometeu-infra/src/main/java/p/studio/utilities/events/SimpleEvent.java b/prometeu-infra/src/main/java/p/studio/utilities/events/SimpleEvent.java new file mode 100644 index 00000000..5761fcd9 --- /dev/null +++ b/prometeu-infra/src/main/java/p/studio/utilities/events/SimpleEvent.java @@ -0,0 +1,4 @@ +package p.studio.utilities.events; + +public interface SimpleEvent { +} diff --git a/prometeu-infra/src/main/java/p/studio/utilities/events/TypedEventBus.java b/prometeu-infra/src/main/java/p/studio/utilities/events/TypedEventBus.java new file mode 100644 index 00000000..d57705fd --- /dev/null +++ b/prometeu-infra/src/main/java/p/studio/utilities/events/TypedEventBus.java @@ -0,0 +1,55 @@ +package p.studio.utilities.events; + +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; + +public final class TypedEventBus { + private final AtomicLong nextSubscriptionId = new AtomicLong(1L); + private final ConcurrentMap, ConcurrentMap>> subscriptionsByType = + new ConcurrentHashMap<>(); + + public EventSubscription subscribe(Class eventType, Consumer consumer) { + Objects.requireNonNull(eventType, "eventType"); + Objects.requireNonNull(consumer, "consumer"); + + final long subscriptionId = nextSubscriptionId.getAndIncrement(); + final Consumer erasedConsumer = event -> consumer.accept(eventType.cast(event)); + final ConcurrentMap> subscriptions = + subscriptionsByType.computeIfAbsent(eventType, ignored -> new ConcurrentHashMap<>()); + subscriptions.put(subscriptionId, erasedConsumer); + + final AtomicBoolean active = new AtomicBoolean(true); + return () -> { + if (!active.compareAndSet(true, false)) { + return; + } + + final ConcurrentMap> currentSubscriptions = subscriptionsByType.get(eventType); + if (currentSubscriptions == null) { + return; + } + + currentSubscriptions.remove(subscriptionId); + if (currentSubscriptions.isEmpty()) { + subscriptionsByType.remove(eventType, currentSubscriptions); + } + }; + } + + public void publish(Object event) { + Objects.requireNonNull(event, "event"); + + final ConcurrentMap> subscriptions = subscriptionsByType.get(event.getClass()); + if (subscriptions == null || subscriptions.isEmpty()) { + return; + } + + for (Consumer consumer : subscriptions.values()) { + consumer.accept(event); + } + } +} diff --git a/prometeu-infra/src/test/java/p/studio/utilities/events/TypedEventBusTest.java b/prometeu-infra/src/test/java/p/studio/utilities/events/TypedEventBusTest.java new file mode 100644 index 00000000..25c0e833 --- /dev/null +++ b/prometeu-infra/src/test/java/p/studio/utilities/events/TypedEventBusTest.java @@ -0,0 +1,97 @@ +package p.studio.utilities.events; + +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +final class TypedEventBusTest { + @Test + void publishesTypedEventsToSubscribers() { + final TypedEventBus bus = new TypedEventBus(); + final List received = new CopyOnWriteArrayList<>(); + + bus.subscribe(TestEvent.class, event -> received.add(event.value())); + bus.publish(new TestEvent("hello")); + + assertEquals(List.of("hello"), received); + } + + @Test + void unsubscribedListenersStopReceivingEvents() { + final TypedEventBus bus = new TypedEventBus(); + final AtomicInteger calls = new AtomicInteger(); + + final EventSubscription subscription = bus.subscribe(TestEvent.class, event -> calls.incrementAndGet()); + subscription.unsubscribe(); + + bus.publish(new TestEvent("hello")); + + assertEquals(0, calls.get()); + } + + @Test + void publishAndSubscriptionManagementAreThreadSafe() throws Exception { + final TypedEventBus bus = new TypedEventBus(); + final AtomicInteger received = new AtomicInteger(); + final int publisherCount = 4; + final int eventsPerPublisher = 250; + final int subscriptionCount = 12; + + final CountDownLatch ready = new CountDownLatch(publisherCount + subscriptionCount); + final CountDownLatch start = new CountDownLatch(1); + final CountDownLatch done = new CountDownLatch(publisherCount + subscriptionCount); + try (final var executor = Executors.newFixedThreadPool(publisherCount + subscriptionCount)) { + + for (int i = 0; i < subscriptionCount; i++) { + executor.submit(() -> { + ready.countDown(); + await(start); + + final EventSubscription subscription = + bus.subscribe(TestEvent.class, event -> received.incrementAndGet()); + bus.publish(new TestEvent("subscription-ready")); + subscription.unsubscribe(); + done.countDown(); + }); + } + + for (int i = 0; i < publisherCount; i++) { + executor.submit(() -> { + ready.countDown(); + await(start); + + for (int j = 0; j < eventsPerPublisher; j++) { + bus.publish(new TestEvent("payload-" + j)); + } + done.countDown(); + }); + } + + assertTrue(ready.await(5, TimeUnit.SECONDS)); + start.countDown(); + assertTrue(done.await(5, TimeUnit.SECONDS)); + executor.shutdownNow(); + assertTrue(received.get() >= 0); + } + } + + private static void await(CountDownLatch latch) { + try { + latch.await(); + } catch (InterruptedException interruptedException) { + Thread.currentThread().interrupt(); + throw new AssertionError(interruptedException); + } + } + + private record TestEvent(String value) implements SimpleEvent { + } +} diff --git a/prometeu-studio/src/main/java/p/studio/Container.java b/prometeu-studio/src/main/java/p/studio/Container.java index 048bedfe..2ee7e1f7 100644 --- a/prometeu-studio/src/main/java/p/studio/Container.java +++ b/prometeu-studio/src/main/java/p/studio/Container.java @@ -1,15 +1,18 @@ package p.studio; +import p.studio.events.StudioEventBus; import p.studio.utilities.ThemeService; import p.studio.utilities.i18n.I18nService; public class Container { private static final I18nService i18nService; private static final ThemeService themeService; + private static final StudioEventBus studioEventBus; static { i18nService = new I18nService(); themeService = new ThemeService(); + studioEventBus = new StudioEventBus(); } public static void init() { @@ -18,4 +21,6 @@ public class Container { public static I18nService i18n() { return i18nService; } public static ThemeService theme() { return new ThemeService(); } + + public static StudioEventBus events() { return studioEventBus; } } diff --git a/prometeu-studio/src/main/java/p/studio/events/StudioEvent.java b/prometeu-studio/src/main/java/p/studio/events/StudioEvent.java new file mode 100644 index 00000000..7631be37 --- /dev/null +++ b/prometeu-studio/src/main/java/p/studio/events/StudioEvent.java @@ -0,0 +1,6 @@ +package p.studio.events; + +import p.studio.utilities.events.SimpleEvent; + +public interface StudioEvent extends SimpleEvent { +} diff --git a/prometeu-studio/src/main/java/p/studio/events/StudioEventBus.java b/prometeu-studio/src/main/java/p/studio/events/StudioEventBus.java new file mode 100644 index 00000000..bdaf4850 --- /dev/null +++ b/prometeu-studio/src/main/java/p/studio/events/StudioEventBus.java @@ -0,0 +1,27 @@ +package p.studio.events; + +import p.studio.utilities.events.EventSubscription; +import p.studio.utilities.events.TypedEventBus; + +import java.util.Objects; +import java.util.function.Consumer; + +public final class StudioEventBus { + private final TypedEventBus delegate; + + public StudioEventBus() { + this(new TypedEventBus()); + } + + public StudioEventBus(TypedEventBus delegate) { + this.delegate = Objects.requireNonNull(delegate, "delegate"); + } + + public EventSubscription subscribe(Class eventType, Consumer consumer) { + return delegate.subscribe(eventType, consumer); + } + + public void publish(StudioEvent event) { + delegate.publish(event); + } +} diff --git a/prometeu-studio/src/main/java/p/studio/events/StudioWorkspaceEventBus.java b/prometeu-studio/src/main/java/p/studio/events/StudioWorkspaceEventBus.java new file mode 100644 index 00000000..fcbcd62b --- /dev/null +++ b/prometeu-studio/src/main/java/p/studio/events/StudioWorkspaceEventBus.java @@ -0,0 +1,33 @@ +package p.studio.events; + +import p.studio.utilities.events.EventSubscription; +import p.studio.utilities.events.TypedEventBus; +import p.studio.workspaces.WorkspaceId; + +import java.util.Objects; +import java.util.function.Consumer; + +public final class StudioWorkspaceEventBus { + private final WorkspaceId workspaceId; + private final StudioEventBus globalBus; + private final TypedEventBus localBus; + + public StudioWorkspaceEventBus(WorkspaceId workspaceId, StudioEventBus globalBus) { + this.workspaceId = Objects.requireNonNull(workspaceId, "workspaceId"); + this.globalBus = Objects.requireNonNull(globalBus, "globalBus"); + this.localBus = new TypedEventBus(); + } + + public WorkspaceId workspaceId() { + return workspaceId; + } + + public EventSubscription subscribe(Class eventType, Consumer consumer) { + return localBus.subscribe(eventType, consumer); + } + + public void publish(StudioEvent event) { + localBus.publish(event); + globalBus.publish(event); + } +} diff --git a/prometeu-studio/src/main/java/p/studio/events/StudioWorkspaceSelectedEvent.java b/prometeu-studio/src/main/java/p/studio/events/StudioWorkspaceSelectedEvent.java new file mode 100644 index 00000000..b0232dcd --- /dev/null +++ b/prometeu-studio/src/main/java/p/studio/events/StudioWorkspaceSelectedEvent.java @@ -0,0 +1,6 @@ +package p.studio.events; + +import p.studio.workspaces.WorkspaceId; + +public record StudioWorkspaceSelectedEvent(WorkspaceId workspaceId) implements StudioEvent { +} diff --git a/prometeu-studio/src/test/java/p/studio/events/StudioWorkspaceEventBusTest.java b/prometeu-studio/src/test/java/p/studio/events/StudioWorkspaceEventBusTest.java new file mode 100644 index 00000000..600896fc --- /dev/null +++ b/prometeu-studio/src/test/java/p/studio/events/StudioWorkspaceEventBusTest.java @@ -0,0 +1,46 @@ +package p.studio.events; + +import org.junit.jupiter.api.Test; +import p.studio.workspaces.WorkspaceId; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +final class StudioWorkspaceEventBusTest { + @Test + void workspaceEventsAreObservedLocallyAndGlobally() { + final StudioEventBus globalBus = new StudioEventBus(); + final StudioWorkspaceEventBus workspaceBus = new StudioWorkspaceEventBus(WorkspaceId.ASSETS, globalBus); + final List localReceived = new CopyOnWriteArrayList<>(); + final List globalReceived = new CopyOnWriteArrayList<>(); + + workspaceBus.subscribe(TestStudioEvent.class, event -> localReceived.add(event.name())); + globalBus.subscribe(TestStudioEvent.class, event -> globalReceived.add(event.name())); + + workspaceBus.publish(new TestStudioEvent("asset-discovered")); + + assertEquals(List.of("asset-discovered"), localReceived); + assertEquals(List.of("asset-discovered"), globalReceived); + } + + @Test + void globalEventsAreNotRebroadcastBackIntoWorkspaceBus() { + final StudioEventBus globalBus = new StudioEventBus(); + final StudioWorkspaceEventBus workspaceBus = new StudioWorkspaceEventBus(WorkspaceId.ASSETS, globalBus); + final List localReceived = new CopyOnWriteArrayList<>(); + final List globalReceived = new CopyOnWriteArrayList<>(); + + workspaceBus.subscribe(TestStudioEvent.class, event -> localReceived.add(event.name())); + globalBus.subscribe(TestStudioEvent.class, event -> globalReceived.add(event.name())); + + globalBus.publish(new TestStudioEvent("global-only")); + + assertEquals(List.of(), localReceived); + assertEquals(List.of("global-only"), globalReceived); + } + + private record TestStudioEvent(String name) implements StudioEvent { + } +}