From 17f9a190d5e9003613147587ec4ba8842c89c500 Mon Sep 17 00:00:00 2001 From: bQUARKz Date: Mon, 6 Apr 2026 06:43:42 +0100 Subject: [PATCH] implements PLN-0044 runtime handshake integration --- .../StudioRuntimeDebugConnectionSettings.java | 19 ++ ...StudioRuntimeHandshakeClientToRuntime.java | 13 + .../runtime/StudioRuntimeHandshakeResult.java | 22 ++ ...StudioRuntimeHandshakeRuntimeToClient.java | 17 ++ .../StudioRuntimeHandshakeService.java | 241 ++++++++++++++++++ .../debug/runtime/StudioRuntimeLogEvent.java | 12 + .../StudioRuntimeHandshakeServiceTest.java | 108 ++++++++ 7 files changed, 432 insertions(+) create mode 100644 prometeu-studio/src/main/java/p/studio/debug/runtime/StudioRuntimeDebugConnectionSettings.java create mode 100644 prometeu-studio/src/main/java/p/studio/debug/runtime/StudioRuntimeHandshakeClientToRuntime.java create mode 100644 prometeu-studio/src/main/java/p/studio/debug/runtime/StudioRuntimeHandshakeResult.java create mode 100644 prometeu-studio/src/main/java/p/studio/debug/runtime/StudioRuntimeHandshakeRuntimeToClient.java create mode 100644 prometeu-studio/src/main/java/p/studio/debug/runtime/StudioRuntimeHandshakeService.java create mode 100644 prometeu-studio/src/main/java/p/studio/debug/runtime/StudioRuntimeLogEvent.java create mode 100644 prometeu-studio/src/test/java/p/studio/debug/runtime/StudioRuntimeHandshakeServiceTest.java diff --git a/prometeu-studio/src/main/java/p/studio/debug/runtime/StudioRuntimeDebugConnectionSettings.java b/prometeu-studio/src/main/java/p/studio/debug/runtime/StudioRuntimeDebugConnectionSettings.java new file mode 100644 index 00000000..298d6806 --- /dev/null +++ b/prometeu-studio/src/main/java/p/studio/debug/runtime/StudioRuntimeDebugConnectionSettings.java @@ -0,0 +1,19 @@ +package p.studio.debug.runtime; + +public record StudioRuntimeDebugConnectionSettings( + String host, + int port) { + public static final String DEFAULT_HOST = "127.0.0.1"; + public static final int DEFAULT_PORT = 7777; + + public StudioRuntimeDebugConnectionSettings { + host = host == null || host.isBlank() ? DEFAULT_HOST : host.trim(); + if (port <= 0) { + port = DEFAULT_PORT; + } + } + + public static StudioRuntimeDebugConnectionSettings defaults() { + return new StudioRuntimeDebugConnectionSettings(DEFAULT_HOST, DEFAULT_PORT); + } +} diff --git a/prometeu-studio/src/main/java/p/studio/debug/runtime/StudioRuntimeHandshakeClientToRuntime.java b/prometeu-studio/src/main/java/p/studio/debug/runtime/StudioRuntimeHandshakeClientToRuntime.java new file mode 100644 index 00000000..8c7efe03 --- /dev/null +++ b/prometeu-studio/src/main/java/p/studio/debug/runtime/StudioRuntimeHandshakeClientToRuntime.java @@ -0,0 +1,13 @@ +package p.studio.debug.runtime; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public record StudioRuntimeHandshakeClientToRuntime( + @JsonProperty("type") String type) { + + public StudioRuntimeHandshakeClientToRuntime { + if (!"start".equals(type) && !"ok".equals(type)) { + throw new IllegalArgumentException("type must be 'start' or 'ok'"); + } + } +} diff --git a/prometeu-studio/src/main/java/p/studio/debug/runtime/StudioRuntimeHandshakeResult.java b/prometeu-studio/src/main/java/p/studio/debug/runtime/StudioRuntimeHandshakeResult.java new file mode 100644 index 00000000..fe6807bf --- /dev/null +++ b/prometeu-studio/src/main/java/p/studio/debug/runtime/StudioRuntimeHandshakeResult.java @@ -0,0 +1,22 @@ +package p.studio.debug.runtime; + +public record StudioRuntimeHandshakeResult( + boolean success, + StudioRuntimeDebugConnectionSettings settings, + Integer protocolVersion, + String runtimeVersion, + String failureMessage) { + + public static StudioRuntimeHandshakeResult success( + final StudioRuntimeDebugConnectionSettings settings, + final int protocolVersion, + final String runtimeVersion) { + return new StudioRuntimeHandshakeResult(true, settings, protocolVersion, runtimeVersion, null); + } + + public static StudioRuntimeHandshakeResult failure( + final StudioRuntimeDebugConnectionSettings settings, + final String failureMessage) { + return new StudioRuntimeHandshakeResult(false, settings, null, null, failureMessage); + } +} diff --git a/prometeu-studio/src/main/java/p/studio/debug/runtime/StudioRuntimeHandshakeRuntimeToClient.java b/prometeu-studio/src/main/java/p/studio/debug/runtime/StudioRuntimeHandshakeRuntimeToClient.java new file mode 100644 index 00000000..c532be71 --- /dev/null +++ b/prometeu-studio/src/main/java/p/studio/debug/runtime/StudioRuntimeHandshakeRuntimeToClient.java @@ -0,0 +1,17 @@ +package p.studio.debug.runtime; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; + +@JsonIgnoreProperties(ignoreUnknown = true) +public record StudioRuntimeHandshakeRuntimeToClient( + @JsonProperty("type") String type, + @JsonProperty("protocol_version") int protocolVersion, + @JsonProperty("runtime_version") String runtimeVersion) { + + public StudioRuntimeHandshakeRuntimeToClient { + if (!"handshake".equals(type)) { + throw new IllegalArgumentException("type must be 'handshake'"); + } + } +} diff --git a/prometeu-studio/src/main/java/p/studio/debug/runtime/StudioRuntimeHandshakeService.java b/prometeu-studio/src/main/java/p/studio/debug/runtime/StudioRuntimeHandshakeService.java new file mode 100644 index 00000000..175e5eda --- /dev/null +++ b/prometeu-studio/src/main/java/p/studio/debug/runtime/StudioRuntimeHandshakeService.java @@ -0,0 +1,241 @@ +package p.studio.debug.runtime; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import p.studio.Container; +import p.studio.StudioBackgroundTasks; +import p.studio.execution.StudioExecutionLogSeverity; +import p.studio.execution.StudioExecutionSessionService; +import p.studio.execution.StudioExecutionState; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.nio.charset.StandardCharsets; +import java.util.Objects; +import java.util.concurrent.Future; + +public final class StudioRuntimeHandshakeService { + private final ObjectMapper mapper; + private final StudioBackgroundTasks backgroundTasks; + private final Object monitor = new Object(); + private Socket socket; + private BufferedReader reader; + private BufferedWriter writer; + private Future receiverTask; + private boolean intentionalDisconnect; + + public StudioRuntimeHandshakeService() { + this(Container.mapper(), Container.backgroundTasks()); + } + + public StudioRuntimeHandshakeService( + final ObjectMapper mapper, + final StudioBackgroundTasks backgroundTasks) { + this.mapper = Objects.requireNonNull(mapper, "mapper"); + this.backgroundTasks = Objects.requireNonNull(backgroundTasks, "backgroundTasks"); + } + + public StudioRuntimeHandshakeResult connect(final StudioExecutionSessionService session) { + return connect(session, StudioRuntimeDebugConnectionSettings.defaults()); + } + + public StudioRuntimeHandshakeResult connect( + final StudioExecutionSessionService session, + final StudioRuntimeDebugConnectionSettings settings) { + final StudioExecutionSessionService safeSession = Objects.requireNonNull(session, "session"); + final StudioRuntimeDebugConnectionSettings safeSettings = Objects.requireNonNull(settings, "settings"); + safeSession.transitionToConnecting(); + try { + synchronized (monitor) { + cleanupLocked(); + intentionalDisconnect = false; + socket = new Socket(); + socket.connect(new InetSocketAddress(safeSettings.host(), safeSettings.port()), 5_000); + socket.setSoTimeout(5_000); + reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8)); + writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream(), StandardCharsets.UTF_8)); + } + + final String handshakeLine = readHandshakeLine(); + final StudioRuntimeHandshakeRuntimeToClient handshake = mapper.readValue( + handshakeLine, + StudioRuntimeHandshakeRuntimeToClient.class); + writeMessage(mapper.writeValueAsString(new StudioRuntimeHandshakeClientToRuntime("start"))); + synchronized (monitor) { + if (socket != null) { + socket.setSoTimeout(0); + } + receiverTask = backgroundTasks.submit(() -> receiveLoop(safeSession)); + } + + safeSession.appendRuntimeLog( + StudioExecutionLogSeverity.INFO, + "Runtime handshake connected to %s:%d".formatted(safeSettings.host(), safeSettings.port())); + safeSession.appendRuntimeLog( + StudioExecutionLogSeverity.INFO, + "Runtime protocol %d (%s)".formatted(handshake.protocolVersion(), handshake.runtimeVersion())); + safeSession.transitionToRunning(); + return StudioRuntimeHandshakeResult.success(safeSettings, handshake.protocolVersion(), handshake.runtimeVersion()); + } catch (Exception exception) { + cleanup(); + safeSession.appendRuntimeLog( + StudioExecutionLogSeverity.ERROR, + "Runtime handshake failed: " + failureMessage(exception)); + safeSession.transitionToRuntimeFailed(); + return StudioRuntimeHandshakeResult.failure(safeSettings, failureMessage(exception)); + } + } + + public void disconnect(final StudioExecutionSessionService session) { + final StudioExecutionSessionService safeSession = Objects.requireNonNull(session, "session"); + final boolean wasConnected; + synchronized (monitor) { + wasConnected = socket != null; + intentionalDisconnect = true; + cleanupLocked(); + } + if (wasConnected) { + safeSession.appendRuntimeLog(StudioExecutionLogSeverity.INFO, "Runtime debug connection disconnected."); + final StudioExecutionState currentState = safeSession.snapshot().state(); + if (currentState == StudioExecutionState.CONNECTING || currentState == StudioExecutionState.RUNNING) { + safeSession.transitionToStopped(); + } + } + } + + private void receiveLoop(final StudioExecutionSessionService session) { + try { + String line; + while ((line = readRuntimeLine()) != null) { + processRuntimeLine(session, line); + } + handleUnexpectedDisconnect(session, "Runtime debug connection closed."); + } catch (IOException ioException) { + handleUnexpectedDisconnect(session, "Runtime debug connection lost: " + failureMessage(ioException)); + } + } + + private void processRuntimeLine( + final StudioExecutionSessionService session, + final String line) throws IOException { + final JsonNode node = mapper.readTree(line); + if (node == null || !node.has("event")) { + return; + } + if ("log".equals(node.get("event").asText())) { + final StudioRuntimeLogEvent event = mapper.treeToValue(node, StudioRuntimeLogEvent.class); + session.appendRuntimeLog( + severityOf(event.level()), + renderRuntimeMessage(event)); + } + } + + private String renderRuntimeMessage(final StudioRuntimeLogEvent event) { + if (event.source() == null || event.source().isBlank()) { + return event.message(); + } + return "[%s] %s".formatted(event.source(), event.message()); + } + + private StudioExecutionLogSeverity severityOf(final String level) { + if (level == null) { + return StudioExecutionLogSeverity.INFO; + } + return switch (level.trim().toUpperCase()) { + case "ERROR", "FATAL" -> StudioExecutionLogSeverity.ERROR; + default -> StudioExecutionLogSeverity.INFO; + }; + } + + private String readHandshakeLine() throws IOException { + synchronized (monitor) { + if (reader == null) { + throw new IOException("runtime debug reader not available"); + } + final String line = reader.readLine(); + if (line == null) { + throw new IOException("connection closed during handshake"); + } + return line; + } + } + + private String readRuntimeLine() throws IOException { + synchronized (monitor) { + if (reader == null) { + return null; + } + return reader.readLine(); + } + } + + private void writeMessage(final String message) throws IOException { + synchronized (monitor) { + if (writer == null) { + throw new IOException("runtime debug writer not available"); + } + writer.write(message); + writer.newLine(); + writer.flush(); + } + } + + private void handleUnexpectedDisconnect( + final StudioExecutionSessionService session, + final String message) { + final boolean shouldReport; + synchronized (monitor) { + shouldReport = !intentionalDisconnect; + cleanupLocked(); + } + if (!shouldReport) { + return; + } + session.appendRuntimeLog(StudioExecutionLogSeverity.ERROR, message); + final StudioExecutionState currentState = session.snapshot().state(); + if (currentState == StudioExecutionState.CONNECTING || currentState == StudioExecutionState.RUNNING) { + session.transitionToRuntimeFailed(); + } + } + + private void cleanup() { + synchronized (monitor) { + cleanupLocked(); + } + } + + private void cleanupLocked() { + if (receiverTask != null) { + receiverTask.cancel(true); + receiverTask = null; + } + closeQuietly(reader); + closeQuietly(writer); + closeQuietly(socket); + reader = null; + writer = null; + socket = null; + } + + private void closeQuietly(final AutoCloseable closeable) { + if (closeable == null) { + return; + } + try { + closeable.close(); + } catch (Exception ignored) { + } + } + + private String failureMessage(final Exception exception) { + final String message = exception.getMessage(); + return message == null || message.isBlank() + ? exception.getClass().getSimpleName() + : message; + } +} diff --git a/prometeu-studio/src/main/java/p/studio/debug/runtime/StudioRuntimeLogEvent.java b/prometeu-studio/src/main/java/p/studio/debug/runtime/StudioRuntimeLogEvent.java new file mode 100644 index 00000000..9a057806 --- /dev/null +++ b/prometeu-studio/src/main/java/p/studio/debug/runtime/StudioRuntimeLogEvent.java @@ -0,0 +1,12 @@ +package p.studio.debug.runtime; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; + +@JsonIgnoreProperties(ignoreUnknown = true) +public record StudioRuntimeLogEvent( + @JsonProperty("event") String event, + @JsonProperty("level") String level, + @JsonProperty("source") String source, + @JsonProperty("msg") String message) { +} diff --git a/prometeu-studio/src/test/java/p/studio/debug/runtime/StudioRuntimeHandshakeServiceTest.java b/prometeu-studio/src/test/java/p/studio/debug/runtime/StudioRuntimeHandshakeServiceTest.java new file mode 100644 index 00000000..825fcdda --- /dev/null +++ b/prometeu-studio/src/test/java/p/studio/debug/runtime/StudioRuntimeHandshakeServiceTest.java @@ -0,0 +1,108 @@ +package p.studio.debug.runtime; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.jupiter.api.Test; +import p.studio.StudioBackgroundTasks; +import p.studio.execution.StudioExecutionSessionService; +import p.studio.execution.StudioExecutionState; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.net.ServerSocket; +import java.net.Socket; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.*; + +final class StudioRuntimeHandshakeServiceTest { + @Test + void connectsPublishesRuntimeHandshakeAndStreamsLogEvents() throws Exception { + try (final ServerSocket serverSocket = new ServerSocket(0)) { + final CountDownLatch clientStarted = new CountDownLatch(1); + final CountDownLatch eventDelivered = new CountDownLatch(1); + final Thread serverThread = new Thread(() -> serveSuccessfulHandshake(serverSocket, clientStarted, eventDelivered)); + serverThread.start(); + + final StudioExecutionSessionService session = new StudioExecutionSessionService(); + session.beginPreparationCycle(); + final StudioRuntimeDebugConnectionSettings settings = new StudioRuntimeDebugConnectionSettings("127.0.0.1", serverSocket.getLocalPort()); + final StudioBackgroundTasks backgroundTasks = new StudioBackgroundTasks(Executors.newSingleThreadExecutor()); + try { + final StudioRuntimeHandshakeService service = new StudioRuntimeHandshakeService(new ObjectMapper(), backgroundTasks); + final StudioRuntimeHandshakeResult result = service.connect(session, settings); + + assertTrue(result.success()); + assertEquals(StudioExecutionState.RUNNING, session.snapshot().state()); + assertTrue(clientStarted.await(2, TimeUnit.SECONDS)); + assertTrue(eventDelivered.await(2, TimeUnit.SECONDS)); + assertTrue(session.snapshot().logs().stream().anyMatch(entry -> entry.message().contains("Runtime handshake connected"))); + assertTrue(waitForRuntimeLog(session, "[VM] Boot complete")); + + service.disconnect(session); + } finally { + backgroundTasks.shutdown(); + serverThread.join(2_000); + } + } + } + + @Test + void handshakeFailureTransitionsSessionToRuntimeFailed() { + final StudioExecutionSessionService session = new StudioExecutionSessionService(); + session.beginPreparationCycle(); + final StudioBackgroundTasks backgroundTasks = new StudioBackgroundTasks(Executors.newSingleThreadExecutor()); + try { + final StudioRuntimeHandshakeService service = new StudioRuntimeHandshakeService(new ObjectMapper(), backgroundTasks); + final StudioRuntimeHandshakeResult result = service.connect( + session, + new StudioRuntimeDebugConnectionSettings("127.0.0.1", 1)); + + assertFalse(result.success()); + assertEquals(StudioExecutionState.RUNTIME_FAILED, session.snapshot().state()); + assertTrue(session.snapshot().logs().stream().anyMatch(entry -> entry.message().contains("Runtime handshake failed"))); + } finally { + backgroundTasks.shutdown(); + } + } + + private void serveSuccessfulHandshake( + final ServerSocket serverSocket, + final CountDownLatch clientStarted, + final CountDownLatch eventDelivered) { + try (Socket socket = serverSocket.accept(); + BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8)); + BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream(), StandardCharsets.UTF_8))) { + writer.write("{\"type\":\"handshake\",\"protocol_version\":1,\"runtime_version\":\"1.0.0\"}"); + writer.newLine(); + writer.flush(); + final String clientHandshake = reader.readLine(); + if (!"{\"type\":\"start\"}".equals(clientHandshake)) { + throw new IllegalStateException("unexpected client handshake: " + clientHandshake); + } + clientStarted.countDown(); + writer.write("{\"event\":\"log\",\"level\":\"INFO\",\"source\":\"VM\",\"msg\":\"Boot complete\"}"); + writer.newLine(); + writer.flush(); + eventDelivered.countDown(); + } catch (Exception exception) { + throw new RuntimeException(exception); + } + } + + private boolean waitForRuntimeLog( + final StudioExecutionSessionService session, + final String fragment) throws InterruptedException { + for (int attempt = 0; attempt < 20; attempt += 1) { + if (session.snapshot().logs().stream().anyMatch(entry -> entry.message().contains(fragment))) { + return true; + } + Thread.sleep(50L); + } + return false; + } +}