add EventBus

This commit is contained in:
bQUARKz 2026-03-11 13:32:43 +00:00
parent 0c8c15754d
commit 4e27389df7
Signed by: bquarkz
SSH Key Fingerprint: SHA256:Z7dgqoglWwoK6j6u4QC87OveEq74WOhFN+gitsxtkf8
10 changed files with 284 additions and 0 deletions

View File

@ -0,0 +1,5 @@
package p.studio.utilities.events;
public interface EventSubscription {
void unsubscribe();
}

View File

@ -0,0 +1,4 @@
package p.studio.utilities.events;
public interface SimpleEvent {
}

View File

@ -0,0 +1,55 @@
package p.studio.utilities.events;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
public final class TypedEventBus {
private final AtomicLong nextSubscriptionId = new AtomicLong(1L);
private final ConcurrentMap<Class<?>, ConcurrentMap<Long, Consumer<Object>>> subscriptionsByType =
new ConcurrentHashMap<>();
public <T extends SimpleEvent> EventSubscription subscribe(Class<T> eventType, Consumer<? super T> consumer) {
Objects.requireNonNull(eventType, "eventType");
Objects.requireNonNull(consumer, "consumer");
final long subscriptionId = nextSubscriptionId.getAndIncrement();
final Consumer<Object> erasedConsumer = event -> consumer.accept(eventType.cast(event));
final ConcurrentMap<Long, Consumer<Object>> subscriptions =
subscriptionsByType.computeIfAbsent(eventType, ignored -> new ConcurrentHashMap<>());
subscriptions.put(subscriptionId, erasedConsumer);
final AtomicBoolean active = new AtomicBoolean(true);
return () -> {
if (!active.compareAndSet(true, false)) {
return;
}
final ConcurrentMap<Long, Consumer<Object>> currentSubscriptions = subscriptionsByType.get(eventType);
if (currentSubscriptions == null) {
return;
}
currentSubscriptions.remove(subscriptionId);
if (currentSubscriptions.isEmpty()) {
subscriptionsByType.remove(eventType, currentSubscriptions);
}
};
}
public void publish(Object event) {
Objects.requireNonNull(event, "event");
final ConcurrentMap<Long, Consumer<Object>> subscriptions = subscriptionsByType.get(event.getClass());
if (subscriptions == null || subscriptions.isEmpty()) {
return;
}
for (Consumer<Object> consumer : subscriptions.values()) {
consumer.accept(event);
}
}
}

View File

@ -0,0 +1,97 @@
package p.studio.utilities.events;
import org.junit.jupiter.api.Test;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
final class TypedEventBusTest {
@Test
void publishesTypedEventsToSubscribers() {
final TypedEventBus bus = new TypedEventBus();
final List<String> received = new CopyOnWriteArrayList<>();
bus.subscribe(TestEvent.class, event -> received.add(event.value()));
bus.publish(new TestEvent("hello"));
assertEquals(List.of("hello"), received);
}
@Test
void unsubscribedListenersStopReceivingEvents() {
final TypedEventBus bus = new TypedEventBus();
final AtomicInteger calls = new AtomicInteger();
final EventSubscription subscription = bus.subscribe(TestEvent.class, event -> calls.incrementAndGet());
subscription.unsubscribe();
bus.publish(new TestEvent("hello"));
assertEquals(0, calls.get());
}
@Test
void publishAndSubscriptionManagementAreThreadSafe() throws Exception {
final TypedEventBus bus = new TypedEventBus();
final AtomicInteger received = new AtomicInteger();
final int publisherCount = 4;
final int eventsPerPublisher = 250;
final int subscriptionCount = 12;
final CountDownLatch ready = new CountDownLatch(publisherCount + subscriptionCount);
final CountDownLatch start = new CountDownLatch(1);
final CountDownLatch done = new CountDownLatch(publisherCount + subscriptionCount);
try (final var executor = Executors.newFixedThreadPool(publisherCount + subscriptionCount)) {
for (int i = 0; i < subscriptionCount; i++) {
executor.submit(() -> {
ready.countDown();
await(start);
final EventSubscription subscription =
bus.subscribe(TestEvent.class, event -> received.incrementAndGet());
bus.publish(new TestEvent("subscription-ready"));
subscription.unsubscribe();
done.countDown();
});
}
for (int i = 0; i < publisherCount; i++) {
executor.submit(() -> {
ready.countDown();
await(start);
for (int j = 0; j < eventsPerPublisher; j++) {
bus.publish(new TestEvent("payload-" + j));
}
done.countDown();
});
}
assertTrue(ready.await(5, TimeUnit.SECONDS));
start.countDown();
assertTrue(done.await(5, TimeUnit.SECONDS));
executor.shutdownNow();
assertTrue(received.get() >= 0);
}
}
private static void await(CountDownLatch latch) {
try {
latch.await();
} catch (InterruptedException interruptedException) {
Thread.currentThread().interrupt();
throw new AssertionError(interruptedException);
}
}
private record TestEvent(String value) implements SimpleEvent {
}
}

View File

@ -1,15 +1,18 @@
package p.studio; package p.studio;
import p.studio.events.StudioEventBus;
import p.studio.utilities.ThemeService; import p.studio.utilities.ThemeService;
import p.studio.utilities.i18n.I18nService; import p.studio.utilities.i18n.I18nService;
public class Container { public class Container {
private static final I18nService i18nService; private static final I18nService i18nService;
private static final ThemeService themeService; private static final ThemeService themeService;
private static final StudioEventBus studioEventBus;
static { static {
i18nService = new I18nService(); i18nService = new I18nService();
themeService = new ThemeService(); themeService = new ThemeService();
studioEventBus = new StudioEventBus();
} }
public static void init() { public static void init() {
@ -18,4 +21,6 @@ public class Container {
public static I18nService i18n() { return i18nService; } public static I18nService i18n() { return i18nService; }
public static ThemeService theme() { return new ThemeService(); } public static ThemeService theme() { return new ThemeService(); }
public static StudioEventBus events() { return studioEventBus; }
} }

View File

@ -0,0 +1,6 @@
package p.studio.events;
import p.studio.utilities.events.SimpleEvent;
public interface StudioEvent extends SimpleEvent {
}

View File

@ -0,0 +1,27 @@
package p.studio.events;
import p.studio.utilities.events.EventSubscription;
import p.studio.utilities.events.TypedEventBus;
import java.util.Objects;
import java.util.function.Consumer;
public final class StudioEventBus {
private final TypedEventBus delegate;
public StudioEventBus() {
this(new TypedEventBus());
}
public StudioEventBus(TypedEventBus delegate) {
this.delegate = Objects.requireNonNull(delegate, "delegate");
}
public <T extends StudioEvent> EventSubscription subscribe(Class<T> eventType, Consumer<? super T> consumer) {
return delegate.subscribe(eventType, consumer);
}
public void publish(StudioEvent event) {
delegate.publish(event);
}
}

View File

@ -0,0 +1,33 @@
package p.studio.events;
import p.studio.utilities.events.EventSubscription;
import p.studio.utilities.events.TypedEventBus;
import p.studio.workspaces.WorkspaceId;
import java.util.Objects;
import java.util.function.Consumer;
public final class StudioWorkspaceEventBus {
private final WorkspaceId workspaceId;
private final StudioEventBus globalBus;
private final TypedEventBus localBus;
public StudioWorkspaceEventBus(WorkspaceId workspaceId, StudioEventBus globalBus) {
this.workspaceId = Objects.requireNonNull(workspaceId, "workspaceId");
this.globalBus = Objects.requireNonNull(globalBus, "globalBus");
this.localBus = new TypedEventBus();
}
public WorkspaceId workspaceId() {
return workspaceId;
}
public <T extends StudioEvent> EventSubscription subscribe(Class<T> eventType, Consumer<? super T> consumer) {
return localBus.subscribe(eventType, consumer);
}
public void publish(StudioEvent event) {
localBus.publish(event);
globalBus.publish(event);
}
}

View File

@ -0,0 +1,6 @@
package p.studio.events;
import p.studio.workspaces.WorkspaceId;
public record StudioWorkspaceSelectedEvent(WorkspaceId workspaceId) implements StudioEvent {
}

View File

@ -0,0 +1,46 @@
package p.studio.events;
import org.junit.jupiter.api.Test;
import p.studio.workspaces.WorkspaceId;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import static org.junit.jupiter.api.Assertions.assertEquals;
final class StudioWorkspaceEventBusTest {
@Test
void workspaceEventsAreObservedLocallyAndGlobally() {
final StudioEventBus globalBus = new StudioEventBus();
final StudioWorkspaceEventBus workspaceBus = new StudioWorkspaceEventBus(WorkspaceId.ASSETS, globalBus);
final List<String> localReceived = new CopyOnWriteArrayList<>();
final List<String> globalReceived = new CopyOnWriteArrayList<>();
workspaceBus.subscribe(TestStudioEvent.class, event -> localReceived.add(event.name()));
globalBus.subscribe(TestStudioEvent.class, event -> globalReceived.add(event.name()));
workspaceBus.publish(new TestStudioEvent("asset-discovered"));
assertEquals(List.of("asset-discovered"), localReceived);
assertEquals(List.of("asset-discovered"), globalReceived);
}
@Test
void globalEventsAreNotRebroadcastBackIntoWorkspaceBus() {
final StudioEventBus globalBus = new StudioEventBus();
final StudioWorkspaceEventBus workspaceBus = new StudioWorkspaceEventBus(WorkspaceId.ASSETS, globalBus);
final List<String> localReceived = new CopyOnWriteArrayList<>();
final List<String> globalReceived = new CopyOnWriteArrayList<>();
workspaceBus.subscribe(TestStudioEvent.class, event -> localReceived.add(event.name()));
globalBus.subscribe(TestStudioEvent.class, event -> globalReceived.add(event.name()));
globalBus.publish(new TestStudioEvent("global-only"));
assertEquals(List.of(), localReceived);
assertEquals(List.of("global-only"), globalReceived);
}
private record TestStudioEvent(String name) implements StudioEvent {
}
}