implements PLN-0044 runtime handshake integration
This commit is contained in:
parent
16503b2575
commit
17f9a190d5
@ -0,0 +1,19 @@
|
||||
package p.studio.debug.runtime;
|
||||
|
||||
public record StudioRuntimeDebugConnectionSettings(
|
||||
String host,
|
||||
int port) {
|
||||
public static final String DEFAULT_HOST = "127.0.0.1";
|
||||
public static final int DEFAULT_PORT = 7777;
|
||||
|
||||
public StudioRuntimeDebugConnectionSettings {
|
||||
host = host == null || host.isBlank() ? DEFAULT_HOST : host.trim();
|
||||
if (port <= 0) {
|
||||
port = DEFAULT_PORT;
|
||||
}
|
||||
}
|
||||
|
||||
public static StudioRuntimeDebugConnectionSettings defaults() {
|
||||
return new StudioRuntimeDebugConnectionSettings(DEFAULT_HOST, DEFAULT_PORT);
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,13 @@
|
||||
package p.studio.debug.runtime;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
public record StudioRuntimeHandshakeClientToRuntime(
|
||||
@JsonProperty("type") String type) {
|
||||
|
||||
public StudioRuntimeHandshakeClientToRuntime {
|
||||
if (!"start".equals(type) && !"ok".equals(type)) {
|
||||
throw new IllegalArgumentException("type must be 'start' or 'ok'");
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,22 @@
|
||||
package p.studio.debug.runtime;
|
||||
|
||||
public record StudioRuntimeHandshakeResult(
|
||||
boolean success,
|
||||
StudioRuntimeDebugConnectionSettings settings,
|
||||
Integer protocolVersion,
|
||||
String runtimeVersion,
|
||||
String failureMessage) {
|
||||
|
||||
public static StudioRuntimeHandshakeResult success(
|
||||
final StudioRuntimeDebugConnectionSettings settings,
|
||||
final int protocolVersion,
|
||||
final String runtimeVersion) {
|
||||
return new StudioRuntimeHandshakeResult(true, settings, protocolVersion, runtimeVersion, null);
|
||||
}
|
||||
|
||||
public static StudioRuntimeHandshakeResult failure(
|
||||
final StudioRuntimeDebugConnectionSettings settings,
|
||||
final String failureMessage) {
|
||||
return new StudioRuntimeHandshakeResult(false, settings, null, null, failureMessage);
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,17 @@
|
||||
package p.studio.debug.runtime;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
@JsonIgnoreProperties(ignoreUnknown = true)
|
||||
public record StudioRuntimeHandshakeRuntimeToClient(
|
||||
@JsonProperty("type") String type,
|
||||
@JsonProperty("protocol_version") int protocolVersion,
|
||||
@JsonProperty("runtime_version") String runtimeVersion) {
|
||||
|
||||
public StudioRuntimeHandshakeRuntimeToClient {
|
||||
if (!"handshake".equals(type)) {
|
||||
throw new IllegalArgumentException("type must be 'handshake'");
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,241 @@
|
||||
package p.studio.debug.runtime;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import p.studio.Container;
|
||||
import p.studio.StudioBackgroundTasks;
|
||||
import p.studio.execution.StudioExecutionLogSeverity;
|
||||
import p.studio.execution.StudioExecutionSessionService;
|
||||
import p.studio.execution.StudioExecutionState;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.BufferedWriter;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.OutputStreamWriter;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.Socket;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
public final class StudioRuntimeHandshakeService {
|
||||
private final ObjectMapper mapper;
|
||||
private final StudioBackgroundTasks backgroundTasks;
|
||||
private final Object monitor = new Object();
|
||||
private Socket socket;
|
||||
private BufferedReader reader;
|
||||
private BufferedWriter writer;
|
||||
private Future<?> receiverTask;
|
||||
private boolean intentionalDisconnect;
|
||||
|
||||
public StudioRuntimeHandshakeService() {
|
||||
this(Container.mapper(), Container.backgroundTasks());
|
||||
}
|
||||
|
||||
public StudioRuntimeHandshakeService(
|
||||
final ObjectMapper mapper,
|
||||
final StudioBackgroundTasks backgroundTasks) {
|
||||
this.mapper = Objects.requireNonNull(mapper, "mapper");
|
||||
this.backgroundTasks = Objects.requireNonNull(backgroundTasks, "backgroundTasks");
|
||||
}
|
||||
|
||||
public StudioRuntimeHandshakeResult connect(final StudioExecutionSessionService session) {
|
||||
return connect(session, StudioRuntimeDebugConnectionSettings.defaults());
|
||||
}
|
||||
|
||||
public StudioRuntimeHandshakeResult connect(
|
||||
final StudioExecutionSessionService session,
|
||||
final StudioRuntimeDebugConnectionSettings settings) {
|
||||
final StudioExecutionSessionService safeSession = Objects.requireNonNull(session, "session");
|
||||
final StudioRuntimeDebugConnectionSettings safeSettings = Objects.requireNonNull(settings, "settings");
|
||||
safeSession.transitionToConnecting();
|
||||
try {
|
||||
synchronized (monitor) {
|
||||
cleanupLocked();
|
||||
intentionalDisconnect = false;
|
||||
socket = new Socket();
|
||||
socket.connect(new InetSocketAddress(safeSettings.host(), safeSettings.port()), 5_000);
|
||||
socket.setSoTimeout(5_000);
|
||||
reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8));
|
||||
writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream(), StandardCharsets.UTF_8));
|
||||
}
|
||||
|
||||
final String handshakeLine = readHandshakeLine();
|
||||
final StudioRuntimeHandshakeRuntimeToClient handshake = mapper.readValue(
|
||||
handshakeLine,
|
||||
StudioRuntimeHandshakeRuntimeToClient.class);
|
||||
writeMessage(mapper.writeValueAsString(new StudioRuntimeHandshakeClientToRuntime("start")));
|
||||
synchronized (monitor) {
|
||||
if (socket != null) {
|
||||
socket.setSoTimeout(0);
|
||||
}
|
||||
receiverTask = backgroundTasks.submit(() -> receiveLoop(safeSession));
|
||||
}
|
||||
|
||||
safeSession.appendRuntimeLog(
|
||||
StudioExecutionLogSeverity.INFO,
|
||||
"Runtime handshake connected to %s:%d".formatted(safeSettings.host(), safeSettings.port()));
|
||||
safeSession.appendRuntimeLog(
|
||||
StudioExecutionLogSeverity.INFO,
|
||||
"Runtime protocol %d (%s)".formatted(handshake.protocolVersion(), handshake.runtimeVersion()));
|
||||
safeSession.transitionToRunning();
|
||||
return StudioRuntimeHandshakeResult.success(safeSettings, handshake.protocolVersion(), handshake.runtimeVersion());
|
||||
} catch (Exception exception) {
|
||||
cleanup();
|
||||
safeSession.appendRuntimeLog(
|
||||
StudioExecutionLogSeverity.ERROR,
|
||||
"Runtime handshake failed: " + failureMessage(exception));
|
||||
safeSession.transitionToRuntimeFailed();
|
||||
return StudioRuntimeHandshakeResult.failure(safeSettings, failureMessage(exception));
|
||||
}
|
||||
}
|
||||
|
||||
public void disconnect(final StudioExecutionSessionService session) {
|
||||
final StudioExecutionSessionService safeSession = Objects.requireNonNull(session, "session");
|
||||
final boolean wasConnected;
|
||||
synchronized (monitor) {
|
||||
wasConnected = socket != null;
|
||||
intentionalDisconnect = true;
|
||||
cleanupLocked();
|
||||
}
|
||||
if (wasConnected) {
|
||||
safeSession.appendRuntimeLog(StudioExecutionLogSeverity.INFO, "Runtime debug connection disconnected.");
|
||||
final StudioExecutionState currentState = safeSession.snapshot().state();
|
||||
if (currentState == StudioExecutionState.CONNECTING || currentState == StudioExecutionState.RUNNING) {
|
||||
safeSession.transitionToStopped();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void receiveLoop(final StudioExecutionSessionService session) {
|
||||
try {
|
||||
String line;
|
||||
while ((line = readRuntimeLine()) != null) {
|
||||
processRuntimeLine(session, line);
|
||||
}
|
||||
handleUnexpectedDisconnect(session, "Runtime debug connection closed.");
|
||||
} catch (IOException ioException) {
|
||||
handleUnexpectedDisconnect(session, "Runtime debug connection lost: " + failureMessage(ioException));
|
||||
}
|
||||
}
|
||||
|
||||
private void processRuntimeLine(
|
||||
final StudioExecutionSessionService session,
|
||||
final String line) throws IOException {
|
||||
final JsonNode node = mapper.readTree(line);
|
||||
if (node == null || !node.has("event")) {
|
||||
return;
|
||||
}
|
||||
if ("log".equals(node.get("event").asText())) {
|
||||
final StudioRuntimeLogEvent event = mapper.treeToValue(node, StudioRuntimeLogEvent.class);
|
||||
session.appendRuntimeLog(
|
||||
severityOf(event.level()),
|
||||
renderRuntimeMessage(event));
|
||||
}
|
||||
}
|
||||
|
||||
private String renderRuntimeMessage(final StudioRuntimeLogEvent event) {
|
||||
if (event.source() == null || event.source().isBlank()) {
|
||||
return event.message();
|
||||
}
|
||||
return "[%s] %s".formatted(event.source(), event.message());
|
||||
}
|
||||
|
||||
private StudioExecutionLogSeverity severityOf(final String level) {
|
||||
if (level == null) {
|
||||
return StudioExecutionLogSeverity.INFO;
|
||||
}
|
||||
return switch (level.trim().toUpperCase()) {
|
||||
case "ERROR", "FATAL" -> StudioExecutionLogSeverity.ERROR;
|
||||
default -> StudioExecutionLogSeverity.INFO;
|
||||
};
|
||||
}
|
||||
|
||||
private String readHandshakeLine() throws IOException {
|
||||
synchronized (monitor) {
|
||||
if (reader == null) {
|
||||
throw new IOException("runtime debug reader not available");
|
||||
}
|
||||
final String line = reader.readLine();
|
||||
if (line == null) {
|
||||
throw new IOException("connection closed during handshake");
|
||||
}
|
||||
return line;
|
||||
}
|
||||
}
|
||||
|
||||
private String readRuntimeLine() throws IOException {
|
||||
synchronized (monitor) {
|
||||
if (reader == null) {
|
||||
return null;
|
||||
}
|
||||
return reader.readLine();
|
||||
}
|
||||
}
|
||||
|
||||
private void writeMessage(final String message) throws IOException {
|
||||
synchronized (monitor) {
|
||||
if (writer == null) {
|
||||
throw new IOException("runtime debug writer not available");
|
||||
}
|
||||
writer.write(message);
|
||||
writer.newLine();
|
||||
writer.flush();
|
||||
}
|
||||
}
|
||||
|
||||
private void handleUnexpectedDisconnect(
|
||||
final StudioExecutionSessionService session,
|
||||
final String message) {
|
||||
final boolean shouldReport;
|
||||
synchronized (monitor) {
|
||||
shouldReport = !intentionalDisconnect;
|
||||
cleanupLocked();
|
||||
}
|
||||
if (!shouldReport) {
|
||||
return;
|
||||
}
|
||||
session.appendRuntimeLog(StudioExecutionLogSeverity.ERROR, message);
|
||||
final StudioExecutionState currentState = session.snapshot().state();
|
||||
if (currentState == StudioExecutionState.CONNECTING || currentState == StudioExecutionState.RUNNING) {
|
||||
session.transitionToRuntimeFailed();
|
||||
}
|
||||
}
|
||||
|
||||
private void cleanup() {
|
||||
synchronized (monitor) {
|
||||
cleanupLocked();
|
||||
}
|
||||
}
|
||||
|
||||
private void cleanupLocked() {
|
||||
if (receiverTask != null) {
|
||||
receiverTask.cancel(true);
|
||||
receiverTask = null;
|
||||
}
|
||||
closeQuietly(reader);
|
||||
closeQuietly(writer);
|
||||
closeQuietly(socket);
|
||||
reader = null;
|
||||
writer = null;
|
||||
socket = null;
|
||||
}
|
||||
|
||||
private void closeQuietly(final AutoCloseable closeable) {
|
||||
if (closeable == null) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
closeable.close();
|
||||
} catch (Exception ignored) {
|
||||
}
|
||||
}
|
||||
|
||||
private String failureMessage(final Exception exception) {
|
||||
final String message = exception.getMessage();
|
||||
return message == null || message.isBlank()
|
||||
? exception.getClass().getSimpleName()
|
||||
: message;
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,12 @@
|
||||
package p.studio.debug.runtime;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
@JsonIgnoreProperties(ignoreUnknown = true)
|
||||
public record StudioRuntimeLogEvent(
|
||||
@JsonProperty("event") String event,
|
||||
@JsonProperty("level") String level,
|
||||
@JsonProperty("source") String source,
|
||||
@JsonProperty("msg") String message) {
|
||||
}
|
||||
@ -0,0 +1,108 @@
|
||||
package p.studio.debug.runtime;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import p.studio.StudioBackgroundTasks;
|
||||
import p.studio.execution.StudioExecutionSessionService;
|
||||
import p.studio.execution.StudioExecutionState;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.BufferedWriter;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.OutputStreamWriter;
|
||||
import java.net.ServerSocket;
|
||||
import java.net.Socket;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.*;
|
||||
|
||||
final class StudioRuntimeHandshakeServiceTest {
|
||||
@Test
|
||||
void connectsPublishesRuntimeHandshakeAndStreamsLogEvents() throws Exception {
|
||||
try (final ServerSocket serverSocket = new ServerSocket(0)) {
|
||||
final CountDownLatch clientStarted = new CountDownLatch(1);
|
||||
final CountDownLatch eventDelivered = new CountDownLatch(1);
|
||||
final Thread serverThread = new Thread(() -> serveSuccessfulHandshake(serverSocket, clientStarted, eventDelivered));
|
||||
serverThread.start();
|
||||
|
||||
final StudioExecutionSessionService session = new StudioExecutionSessionService();
|
||||
session.beginPreparationCycle();
|
||||
final StudioRuntimeDebugConnectionSettings settings = new StudioRuntimeDebugConnectionSettings("127.0.0.1", serverSocket.getLocalPort());
|
||||
final StudioBackgroundTasks backgroundTasks = new StudioBackgroundTasks(Executors.newSingleThreadExecutor());
|
||||
try {
|
||||
final StudioRuntimeHandshakeService service = new StudioRuntimeHandshakeService(new ObjectMapper(), backgroundTasks);
|
||||
final StudioRuntimeHandshakeResult result = service.connect(session, settings);
|
||||
|
||||
assertTrue(result.success());
|
||||
assertEquals(StudioExecutionState.RUNNING, session.snapshot().state());
|
||||
assertTrue(clientStarted.await(2, TimeUnit.SECONDS));
|
||||
assertTrue(eventDelivered.await(2, TimeUnit.SECONDS));
|
||||
assertTrue(session.snapshot().logs().stream().anyMatch(entry -> entry.message().contains("Runtime handshake connected")));
|
||||
assertTrue(waitForRuntimeLog(session, "[VM] Boot complete"));
|
||||
|
||||
service.disconnect(session);
|
||||
} finally {
|
||||
backgroundTasks.shutdown();
|
||||
serverThread.join(2_000);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void handshakeFailureTransitionsSessionToRuntimeFailed() {
|
||||
final StudioExecutionSessionService session = new StudioExecutionSessionService();
|
||||
session.beginPreparationCycle();
|
||||
final StudioBackgroundTasks backgroundTasks = new StudioBackgroundTasks(Executors.newSingleThreadExecutor());
|
||||
try {
|
||||
final StudioRuntimeHandshakeService service = new StudioRuntimeHandshakeService(new ObjectMapper(), backgroundTasks);
|
||||
final StudioRuntimeHandshakeResult result = service.connect(
|
||||
session,
|
||||
new StudioRuntimeDebugConnectionSettings("127.0.0.1", 1));
|
||||
|
||||
assertFalse(result.success());
|
||||
assertEquals(StudioExecutionState.RUNTIME_FAILED, session.snapshot().state());
|
||||
assertTrue(session.snapshot().logs().stream().anyMatch(entry -> entry.message().contains("Runtime handshake failed")));
|
||||
} finally {
|
||||
backgroundTasks.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
private void serveSuccessfulHandshake(
|
||||
final ServerSocket serverSocket,
|
||||
final CountDownLatch clientStarted,
|
||||
final CountDownLatch eventDelivered) {
|
||||
try (Socket socket = serverSocket.accept();
|
||||
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8));
|
||||
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream(), StandardCharsets.UTF_8))) {
|
||||
writer.write("{\"type\":\"handshake\",\"protocol_version\":1,\"runtime_version\":\"1.0.0\"}");
|
||||
writer.newLine();
|
||||
writer.flush();
|
||||
final String clientHandshake = reader.readLine();
|
||||
if (!"{\"type\":\"start\"}".equals(clientHandshake)) {
|
||||
throw new IllegalStateException("unexpected client handshake: " + clientHandshake);
|
||||
}
|
||||
clientStarted.countDown();
|
||||
writer.write("{\"event\":\"log\",\"level\":\"INFO\",\"source\":\"VM\",\"msg\":\"Boot complete\"}");
|
||||
writer.newLine();
|
||||
writer.flush();
|
||||
eventDelivered.countDown();
|
||||
} catch (Exception exception) {
|
||||
throw new RuntimeException(exception);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean waitForRuntimeLog(
|
||||
final StudioExecutionSessionService session,
|
||||
final String fragment) throws InterruptedException {
|
||||
for (int attempt = 0; attempt < 20; attempt += 1) {
|
||||
if (session.snapshot().logs().stream().anyMatch(entry -> entry.message().contains(fragment))) {
|
||||
return true;
|
||||
}
|
||||
Thread.sleep(50L);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user