OpenAPI and AsyncAPI Integration Overview
Architecture Diagram
graph TD
A[Client Application] -->|REST Requests| B[OpenAPI REST]
A -->|Event Subscription| C[AsyncAPI Events]
B -->|Synchronous Responses| A
C -->|Asynchronous Notifications| A
B -->|Publish Events| D[Message Broker]
D -->|Stream Events| C
Integration Patterns
1. Request-Response with Event Notification
sequenceDiagram
participant Client
participant REST_API
participant Event_API
participant Message_Broker
Client->>REST_API: POST /conversions/request-conversion
REST_API-->>Client: 202 Accepted (with operation ID)
REST_API->>Message_Broker: Publish CONVERSION_CREATED event
Message_Broker->>Event_API: Stream to subscribers
Event_API-->>Client: SSE CONVERSION_CREATED event
Client->>REST_API: GET /transactions/{id} (optional status check)
2. Event-Driven Workflow
sequenceDiagram
participant Client
participant Message_Broker
participant Worker
Client->>Message_Broker: Publish CONVERSION_REQUEST (ubyx.inbound)
Message_Broker->>Worker: Consume & Start Workflow
Worker->>Message_Broker: Publish CONVERSION_COMPLETED (ubyx.outbound)
Message_Broker->>Client: Consume Event (Notification)
Implementation Examples
JavaScript Client Integration
// 1. Initialize REST client
const restClient = {
baseUrl: 'https://api.ubyx-platform.com/api/v1/clearing',
headers: {
'Content-Type': 'application/json',
'X-Api-Key': 'your-api-key',
'X-Institution-Id': 'your-institution-id'
},
async requestConversion(conversionData) {
const response = await fetch(`${this.baseUrl}/conversions/request-conversion`, {
method: 'POST',
headers: this.headers,
body: JSON.stringify(conversionData)
});
return response.json();
}
};
// 2. Initialize Event client
const eventClient = {
initSSE() {
// SSE stream provides CloudEvents
const eventSource = new EventSource(`${restClient.baseUrl}/transactions/events`);
eventSource.onmessage = (event) => {
// 1. Parse CloudEvent Envelope
const cloudEvent = JSON.parse(event.data);
console.log('Received CloudEvent:', cloudEvent.type, cloudEvent.id);
// 2. Extract Business Payload
const payload = cloudEvent.data;
// 3. Handle Business Event Type (CONVERSION_CREATED, etc.)
const businessEventType = payload.eventType;
console.log('Processing Business Event:', businessEventType);
switch(businessEventType) {
case 'CONVERSION_CREATED':
handleConversionCreated(payload);
break;
case 'CONVERSION_COMPLETED':
handleSettlementCompleted(payload);
break;
// ... other event types
}
};
return eventSource;
}
};
// 3. Combined usage
async function processConversion() {
try {
// Start event listener
const eventSource = eventClient.initSSE();
// Request conversion
const conversionResponse = await restClient.requestConversion({
conversionType: 'REDEMPTION',
receivingInstitutionID: 'bank-a',
// ... other required fields
});
console.log('Conversion requested:', conversionResponse);
// Events will be handled by the SSE listener
} catch (error) {
console.error('Error:', error);
}
}
Java Message Broker Consumer
public class UbyxEventConsumer {
private final KafkaConsumer<String, String> consumer;
private final String institutionId;
public UbyxEventConsumer(String institutionId, String bootstrapServers) {
this.institutionId = institutionId;
// 1. Initialize Kafka properties
Properties props = new Properties();
props.put("bootstrap.servers", bootstrapServers);
props.put("group.id", "ubyx-consumer-" + institutionId);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "earliest");
this.consumer = new KafkaConsumer<>(props);
}
public void startConsuming() {
// Topic: ubyx.outbound (Partitioned by Institution ID)
consumer.subscribe(Collections.singletonList("ubyx.outbound"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
// Deserialize CloudEvent (Envelope)
JsonNode cloudEvent = objectMapper.readTree(record.value());
// Validate CloudEvent Spec
if (!cloudEvent.has("specversion") || !cloudEvent.get("specversion").asText().equals("1.0")) {
continue; // Skip non-CloudEvents
}
// Extract Business Data
JsonNode data = cloudEvent.get("data");
String eventType = data.get("eventType").asText();
processEvent(eventType, data);
} catch (Exception e) {
logError("Failed to process event", e);
}
}
}
}
private void processEvent(String eventType, JsonNode eventData) {
switch (eventType) {
case "CONVERSION_CREATED":
handleConversionCreated(eventData);
break;
case "ISSUER_APPROVED":
handleIssuerApproved(eventData);
break;
// ... other event types
default:
logUnknownEvent(eventType);
}
}
}
Best Practices
Authentication
// Always include required headers
const headers = {
'X-Api-Key': process.env.UBYX_API_KEY,
'X-Institution-Id': process.env.UBYX_INSTITUTION_ID,
'Content-Type': 'application/json'
};
// For SSE connections
const eventSource = new EventSource('/api/v1/clearing/transactions/events', {
headers: {
'X-Api-Key': process.env.UBYX_API_KEY,
'X-Institution-Id': process.env.UBYX_INSTITUTION_ID
}
});
Error Handling
// REST API error handling
async function safeApiCall(endpoint, options) {
try {
const response = await fetch(endpoint, options);
if (!response.ok) {
const errorData = await response.json();
throw new Error(`API Error: ${errorData.message || response.statusText}`);
}
return await response.json();
} catch (error) {
console.error('API call failed:', error);
// Implement retry logic for transient errors
if (isTransientError(error)) {
return retryWithBackoff(() => safeApiCall(endpoint, options), 3);
}
throw error;
}
}
// Event handling error recovery
eventSource.onerror = (error) => {
console.error('SSE error:', error);
// Implement reconnection logic
setTimeout(() => {
eventSource.close();
initSSE(); // Reconnect
}, 5000);
};
Data Validation
// Validate against OpenAPI schemas
import { validate } from 'jsonschema';
function validateConversionRequest(data) {
const schema = {
type: 'object',
required: [
'conversionType',
'receivingInstitutionID',
'issuerID',
'cashSettlerID',
'assetSettlerID',
'ownerMeta',
'assetID',
'quantity',
'currency'
],
properties: {
conversionType: { enum: ['REDEMPTION', 'MINTING'] },
quantity: { type: 'number', minimum: 0.01 },
currency: { type: 'string', pattern: '^[A-Z]{3}{{CONTENT}}#x27; }
// ... other property validations
}
};
const result = validate(data, schema);
if (!result.valid) {
throw new Error(`Validation failed: ${result.errors.join(', ')}`);
}
return true;
}
Deployment Patterns
Dockerized Consumer
FROM openjdk:17-jdk-slim
WORKDIR /app
COPY target/ubyx-event-consumer.jar /app/consumer.jar
COPY config/application.properties /app/
ENTRYPOINT ["java", "-jar", "consumer.jar"]
Kubernetes Deployment
apiVersion: apps/v1
kind: Deployment
metadata:
name: ubyx-event-consumer
spec:
replicas: 3
selector:
matchLabels:
app: ubyx-event-consumer
template:
metadata:
labels:
app: ubyx-event-consumer
spec:
containers:
- name: consumer
image: your-registry/ubyx-event-consumer:latest
env:
- name: KAFKA_BOOTSTRAP_SERVERS
value: "kafka.ubyx-platform.com:9092"
- name: INSTITUTION_ID
valueFrom:
secretKeyRef:
name: ubyx-secrets
key: institution-id
- name: API_KEY
valueFrom:
secretKeyRef:
name: ubyx-secrets
key: api-key
Monitoring and Observability
Metrics Collection
// Track API performance
const startTime = Date.now();
const response = await fetch('/api/endpoint');
const duration = Date.now() - startTime;
// Log metrics
metrics.track('api_call_duration', duration, {
endpoint: '/api/endpoint',
status: response.status
});
// Track event processing
eventSource.onmessage = (event) => {
const processStart = Date.now();
// ... process event
const processDuration = Date.now() - processStart();
metrics.track('event_processing_duration', processDuration, {
eventType: event.data.eventType
});
};
Health Checks
// REST API health check
async function checkApiHealth() {
try {
const response = await fetch('/api/v1/clearing/health', {
headers: {
'X-Api-Key': process.env.UBYX_API_KEY,
'X-Institution-Id': process.env.UBYX_INSTITUTION_ID
}
});
return response.ok;
} catch (error) {
return false;
}
}
// Event stream health check
function checkEventStreamHealth() {
return new Promise((resolve) => {
const testEventSource = new EventSource('/api/v1/clearing/transactions/events');
const timeout = setTimeout(() => {
testEventSource.close();
resolve(false);
}, 5000);
testEventSource.onopen = () => {
clearTimeout(timeout);
testEventSource.close();
resolve(true);
};
testEventSource.onerror = () => {
clearTimeout(timeout);
testEventSource.close();
resolve(false);
};
});
}
Security Considerations
API Key Management
# Never commit API keys to version control
# Use environment variables or secret management
# .env file (add to .gitignore)
UBYX_API_KEY=your-api-key-here
UBYX_INSTITUTION_ID=your-institution-id
# In Kubernetes
apiVersion: v1
kind: Secret
metadata:
name: ubyx-api-keys
type: Opaque
data:
api-key: base64-encoded-key
institution-id: base64-encoded-id
Rate Limiting
// Implement client-side rate limiting
class RateLimiter {
constructor(limit, interval) {
this.limit = limit;
this.interval = interval;
this.queue = [];
}
async execute(fn) {
if (this.queue.length >= this.limit) {
await new Promise(resolve => setTimeout(resolve, this.interval));
}
this.queue.push(Date.now());
setTimeout(() => {
this.queue = this.queue.filter(t => Date.now() - t < this.interval);
}, this.interval);
return fn();
}
}
// Usage
const limiter = new RateLimiter(10, 1000); // 10 requests per second
await limiter.execute(() => fetch('/api/endpoint'));
This comprehensive integration guide provides patterns and examples for effectively combining REST API calls with asynchronous event processing in the Ubyx platform ecosystem.