Add runtime networking APIs

This commit is contained in:
gem
2026-06-09 16:09:19 +08:00
parent 4f36d68b74
commit 7b3c5cb0f5
20 changed files with 936 additions and 6 deletions

View File

@@ -0,0 +1,273 @@
import 'dart:async' as async;
import 'dart:convert';
import 'package:http/http.dart' as http;
import 'package:web_socket_channel/web_socket_channel.dart';
import '../diagnostics/runtime_diagnostics.dart';
import '../models/runtime_event.dart';
class RuntimeNetworkManager {
RuntimeNetworkManager({
required void Function(RuntimeEvent event) eventSink,
RuntimeDiagnostics? diagnostics,
http.Client? httpClient,
WebSocketChannel Function(Uri uri, {Iterable<String>? protocols})?
webSocketFactory,
}) : _eventSink = eventSink,
_diagnostics = diagnostics,
_httpClient = httpClient ?? http.Client(),
_ownsHttpClient = httpClient == null,
_webSocketFactory =
webSocketFactory ??
((uri, {protocols}) =>
WebSocketChannel.connect(uri, protocols: protocols));
final void Function(RuntimeEvent event) _eventSink;
final RuntimeDiagnostics? _diagnostics;
final http.Client _httpClient;
final bool _ownsHttpClient;
final WebSocketChannel Function(Uri uri, {Iterable<String>? protocols})
_webSocketFactory;
final Map<String, _RuntimeWebSocketConnection> _webSockets = {};
bool _disposed = false;
Future<void> httpRequest(RuntimeHttpRequest request) async {
if (_disposed) {
return;
}
try {
_ensureScheme(request.uri, const {'http', 'https'}, 'HTTP request');
final response = await _httpClient
.send(
http.Request(request.method, request.uri)
..headers.addAll(request.headers)
..body = request.body ?? '',
)
.timeout(request.timeout);
final body = await response.stream.bytesToString();
_emit(
RuntimeEvent(
type: RuntimeNetworkEventType.http,
data: {
'id': request.id,
'url': request.uri.toString(),
'method': request.method,
'ok': response.statusCode >= 200 && response.statusCode < 300,
'status': response.statusCode,
'headers': response.headers,
'body': body,
},
),
);
} catch (error) {
_diagnostics?.record(
type: RuntimeDiagnosticType.networkError,
message: 'Runtime HTTP request failed',
error: error,
context: {'id': request.id, 'url': request.uri.toString()},
);
_emit(
RuntimeEvent(
type: RuntimeNetworkEventType.http,
data: {
'id': request.id,
'url': request.uri.toString(),
'method': request.method,
'ok': false,
'error': error.toString(),
},
),
);
}
}
void wsConnect(RuntimeWebSocketConnectRequest request) {
if (_disposed) {
return;
}
try {
_ensureScheme(request.uri, const {'ws', 'wss'}, 'WebSocket connect');
closeWebSocket(request.id, emitClose: false);
final channel = _webSocketFactory(
request.uri,
protocols: request.protocols.isEmpty ? null : request.protocols,
);
final subscription = channel.stream.listen(
(message) {
_emit(
RuntimeEvent(
type: RuntimeNetworkEventType.wsMessage,
data: {
'id': request.id,
'url': request.uri.toString(),
'message': _webSocketMessageToString(message),
},
),
);
},
onError: (Object error) {
_diagnostics?.record(
type: RuntimeDiagnosticType.networkError,
message: 'Runtime WebSocket error',
error: error,
context: {'id': request.id, 'url': request.uri.toString()},
);
_emit(
RuntimeEvent(
type: RuntimeNetworkEventType.wsError,
data: {
'id': request.id,
'url': request.uri.toString(),
'error': error.toString(),
},
),
);
},
onDone: () {
_webSockets.remove(request.id);
_emit(
RuntimeEvent(
type: RuntimeNetworkEventType.wsClose,
data: {'id': request.id, 'url': request.uri.toString()},
),
);
},
);
_webSockets[request.id] = _RuntimeWebSocketConnection(
channel: channel,
subscription: subscription,
);
_emit(
RuntimeEvent(
type: RuntimeNetworkEventType.wsOpen,
data: {'id': request.id, 'url': request.uri.toString()},
),
);
} catch (error) {
_diagnostics?.record(
type: RuntimeDiagnosticType.networkError,
message: 'Runtime WebSocket connect failed',
error: error,
context: {'id': request.id, 'url': request.uri.toString()},
);
_emit(
RuntimeEvent(
type: RuntimeNetworkEventType.wsError,
data: {
'id': request.id,
'url': request.uri.toString(),
'error': error.toString(),
},
),
);
}
}
bool wsSend(String id, Object? message) {
final connection = _webSockets[id];
if (_disposed || connection == null) {
return false;
}
connection.channel.sink.add(message?.toString() ?? '');
return true;
}
bool closeWebSocket(String id, {bool emitClose = true}) {
final connection = _webSockets.remove(id);
if (connection == null) {
return false;
}
connection.subscription.cancel();
connection.channel.sink.close();
if (emitClose) {
_emit(
RuntimeEvent(type: RuntimeNetworkEventType.wsClose, data: {'id': id}),
);
}
return true;
}
void dispose() {
_disposed = true;
for (final id in _webSockets.keys.toList(growable: false)) {
closeWebSocket(id, emitClose: false);
}
if (_ownsHttpClient) {
_httpClient.close();
}
}
void _emit(RuntimeEvent event) {
if (_disposed) {
return;
}
_eventSink(event);
}
void _ensureScheme(Uri uri, Set<String> allowed, String label) {
if (!allowed.contains(uri.scheme)) {
throw FormatException(
'$label only supports ${allowed.join('/')} URLs: $uri',
);
}
}
String _webSocketMessageToString(Object? message) {
if (message is String) {
return message;
}
if (message is List<int>) {
return base64Encode(message);
}
return message?.toString() ?? '';
}
}
class RuntimeHttpRequest {
const RuntimeHttpRequest({
required this.id,
required this.method,
required this.uri,
this.headers = const {},
this.body,
this.timeout = const Duration(seconds: 15),
});
final String id;
final String method;
final Uri uri;
final Map<String, String> headers;
final String? body;
final Duration timeout;
}
class RuntimeWebSocketConnectRequest {
const RuntimeWebSocketConnectRequest({
required this.id,
required this.uri,
this.protocols = const [],
});
final String id;
final Uri uri;
final List<String> protocols;
}
abstract final class RuntimeNetworkEventType {
static const http = 'network_http';
static const wsOpen = 'network_ws_open';
static const wsMessage = 'network_ws_message';
static const wsError = 'network_ws_error';
static const wsClose = 'network_ws_close';
}
class _RuntimeWebSocketConnection {
const _RuntimeWebSocketConnection({
required this.channel,
required this.subscription,
});
final WebSocketChannel channel;
final async.StreamSubscription<dynamic> subscription;
}