OpenAPI and AsyncAPI Integration Overview

OpenAPI and AsyncAPI Integration Overview

Architecture Diagram

graph TD A[Client Application] -->|REST Requests| B[OpenAPI REST] A -->|Event Subscription| C[AsyncAPI Events] B -->|Synchronous Responses| A C -->|Asynchronous Notifications| A B -->|Publish Events| D[Message Broker] D -->|Stream Events| C

Integration Patterns

1. Request-Response with Event Notification

sequenceDiagram participant Client participant REST_API participant Event_API participant Message_Broker Client->>REST_API: POST /conversions/request-conversion REST_API-->>Client: 202 Accepted (with operation ID) REST_API->>Message_Broker: Publish CONVERSION_CREATED event Message_Broker->>Event_API: Stream to subscribers Event_API-->>Client: SSE CONVERSION_CREATED event Client->>REST_API: GET /transactions/{id} (optional status check)

2. Event-Driven Workflow

sequenceDiagram participant Client participant Message_Broker participant Worker Client->>Message_Broker: Publish CONVERSION_REQUEST (ubyx.inbound) Message_Broker->>Worker: Consume & Start Workflow Worker->>Message_Broker: Publish CONVERSION_COMPLETED (ubyx.outbound) Message_Broker->>Client: Consume Event (Notification)

Implementation Examples

JavaScript Client Integration

// 1. Initialize REST client
const restClient = {
  baseUrl: 'https://api.ubyx-platform.com/api/v1/clearing',
  headers: {
    'Content-Type': 'application/json',
    'X-Api-Key': 'your-api-key',
    'X-Institution-Id': 'your-institution-id'
  },

  async requestConversion(conversionData) {
    const response = await fetch(`${this.baseUrl}/conversions/request-conversion`, {
      method: 'POST',
      headers: this.headers,
      body: JSON.stringify(conversionData)
    });
    return response.json();
  }
};

// 2. Initialize Event client
const eventClient = {
  initSSE() {
    // SSE stream provides CloudEvents
    const eventSource = new EventSource(`${restClient.baseUrl}/transactions/events`);
    
    eventSource.onmessage = (event) => {
      // 1. Parse CloudEvent Envelope
      const cloudEvent = JSON.parse(event.data);
      console.log('Received CloudEvent:', cloudEvent.type, cloudEvent.id);
      
      // 2. Extract Business Payload
      const payload = cloudEvent.data;
      
      // 3. Handle Business Event Type (CONVERSION_CREATED, etc.)
      const businessEventType = payload.eventType;
      
      console.log('Processing Business Event:', businessEventType);

      switch(businessEventType) {
        case 'CONVERSION_CREATED':
          handleConversionCreated(payload);
          break;
        case 'CONVERSION_COMPLETED':
          handleSettlementCompleted(payload);
          break;
        // ... other event types
      }
    };
    return eventSource;
  }
};

// 3. Combined usage
async function processConversion() {
  try {
    // Start event listener
    const eventSource = eventClient.initSSE();

    // Request conversion
    const conversionResponse = await restClient.requestConversion({
      conversionType: 'REDEMPTION',
      receivingInstitutionID: 'bank-a',
      // ... other required fields
    });

    console.log('Conversion requested:', conversionResponse);

    // Events will be handled by the SSE listener
  } catch (error) {
    console.error('Error:', error);
  }
}

Java Message Broker Consumer

public class UbyxEventConsumer {
    private final KafkaConsumer<String, String> consumer;
    private final String institutionId;

    public UbyxEventConsumer(String institutionId, String bootstrapServers) {
        this.institutionId = institutionId;

// 1. Initialize Kafka properties
        Properties props = new Properties();
        props.put("bootstrap.servers", bootstrapServers);
        props.put("group.id", "ubyx-consumer-" + institutionId);
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset", "earliest");

        this.consumer = new KafkaConsumer<>(props);
    }

    public void startConsuming() {
        // Topic: ubyx.outbound (Partitioned by Institution ID)
        consumer.subscribe(Collections.singletonList("ubyx.outbound"));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                try {
                    // Deserialize CloudEvent (Envelope)
                    JsonNode cloudEvent = objectMapper.readTree(record.value());
                    
                    // Validate CloudEvent Spec
                    if (!cloudEvent.has("specversion") || !cloudEvent.get("specversion").asText().equals("1.0")) {
                        continue; // Skip non-CloudEvents
                    }

                    // Extract Business Data
                    JsonNode data = cloudEvent.get("data");
                    String eventType = data.get("eventType").asText();

                    processEvent(eventType, data);
                } catch (Exception e) {
                    logError("Failed to process event", e);
                }
            }
        }
    }

    private void processEvent(String eventType, JsonNode eventData) {
        switch (eventType) {
            case "CONVERSION_CREATED":
                handleConversionCreated(eventData);
                break;
            case "ISSUER_APPROVED":
                handleIssuerApproved(eventData);
                break;
            // ... other event types
            default:
                logUnknownEvent(eventType);
        }
    }
}

Best Practices

Authentication

// Always include required headers
const headers = {
  'X-Api-Key': process.env.UBYX_API_KEY,
  'X-Institution-Id': process.env.UBYX_INSTITUTION_ID,
  'Content-Type': 'application/json'
};

// For SSE connections
const eventSource = new EventSource('/api/v1/clearing/transactions/events', {
  headers: {
    'X-Api-Key': process.env.UBYX_API_KEY,
    'X-Institution-Id': process.env.UBYX_INSTITUTION_ID
  }
});

Error Handling

// REST API error handling
async function safeApiCall(endpoint, options) {
  try {
    const response = await fetch(endpoint, options);

    if (!response.ok) {
      const errorData = await response.json();
      throw new Error(`API Error: ${errorData.message || response.statusText}`);
    }

    return await response.json();
  } catch (error) {
    console.error('API call failed:', error);
    // Implement retry logic for transient errors
    if (isTransientError(error)) {
      return retryWithBackoff(() => safeApiCall(endpoint, options), 3);
    }
    throw error;
  }
}

// Event handling error recovery
eventSource.onerror = (error) => {
  console.error('SSE error:', error);
  // Implement reconnection logic
  setTimeout(() => {
    eventSource.close();
    initSSE(); // Reconnect
  }, 5000);
};

Data Validation

// Validate against OpenAPI schemas
import { validate } from 'jsonschema';

function validateConversionRequest(data) {
  const schema = {
    type: 'object',
    required: [
      'conversionType',
      'receivingInstitutionID',
      'issuerID',
      'cashSettlerID',
      'assetSettlerID',
      'ownerMeta',
      'assetID',
      'quantity',
      'currency'
    ],
    properties: {
      conversionType: { enum: ['REDEMPTION', 'MINTING'] },
      quantity: { type: 'number', minimum: 0.01 },
      currency: { type: 'string', pattern: '^[A-Z]{3}{{CONTENT}}#x27; }
      // ... other property validations
    }
  };

  const result = validate(data, schema);
  if (!result.valid) {
    throw new Error(`Validation failed: ${result.errors.join(', ')}`);
  }

  return true;
}

Deployment Patterns

Dockerized Consumer

FROM openjdk:17-jdk-slim

WORKDIR /app

COPY target/ubyx-event-consumer.jar /app/consumer.jar
COPY config/application.properties /app/

ENTRYPOINT ["java", "-jar", "consumer.jar"]

Kubernetes Deployment

apiVersion: apps/v1
kind: Deployment
metadata:
  name: ubyx-event-consumer
spec:
  replicas: 3
  selector:
    matchLabels:
      app: ubyx-event-consumer
  template:
    metadata:
      labels:
        app: ubyx-event-consumer
    spec:
      containers:
      - name: consumer
        image: your-registry/ubyx-event-consumer:latest
        env:
        - name: KAFKA_BOOTSTRAP_SERVERS
          value: "kafka.ubyx-platform.com:9092"
        - name: INSTITUTION_ID
          valueFrom:
            secretKeyRef:
              name: ubyx-secrets
              key: institution-id
        - name: API_KEY
          valueFrom:
            secretKeyRef:
              name: ubyx-secrets
              key: api-key

Monitoring and Observability

Metrics Collection

// Track API performance
const startTime = Date.now();
const response = await fetch('/api/endpoint');
const duration = Date.now() - startTime;

// Log metrics
metrics.track('api_call_duration', duration, {
  endpoint: '/api/endpoint',
  status: response.status
});

// Track event processing
eventSource.onmessage = (event) => {
  const processStart = Date.now();
  // ... process event
  const processDuration = Date.now() - processStart();

  metrics.track('event_processing_duration', processDuration, {
    eventType: event.data.eventType
  });
};

Health Checks

// REST API health check
async function checkApiHealth() {
  try {
    const response = await fetch('/api/v1/clearing/health', {
      headers: {
        'X-Api-Key': process.env.UBYX_API_KEY,
        'X-Institution-Id': process.env.UBYX_INSTITUTION_ID
      }
    });
    return response.ok;
  } catch (error) {
    return false;
  }
}

// Event stream health check
function checkEventStreamHealth() {
  return new Promise((resolve) => {
    const testEventSource = new EventSource('/api/v1/clearing/transactions/events');

    const timeout = setTimeout(() => {
      testEventSource.close();
      resolve(false);
    }, 5000);

    testEventSource.onopen = () => {
      clearTimeout(timeout);
      testEventSource.close();
      resolve(true);
    };

    testEventSource.onerror = () => {
      clearTimeout(timeout);
      testEventSource.close();
      resolve(false);
    };
  });
}

Security Considerations

API Key Management

# Never commit API keys to version control
# Use environment variables or secret management

# .env file (add to .gitignore)
UBYX_API_KEY=your-api-key-here
UBYX_INSTITUTION_ID=your-institution-id

# In Kubernetes
apiVersion: v1
kind: Secret
metadata:
  name: ubyx-api-keys
type: Opaque
data:
  api-key: base64-encoded-key
  institution-id: base64-encoded-id

Rate Limiting

// Implement client-side rate limiting
class RateLimiter {
  constructor(limit, interval) {
    this.limit = limit;
    this.interval = interval;
    this.queue = [];
  }

  async execute(fn) {
    if (this.queue.length >= this.limit) {
      await new Promise(resolve => setTimeout(resolve, this.interval));
    }

    this.queue.push(Date.now());
    setTimeout(() => {
      this.queue = this.queue.filter(t => Date.now() - t < this.interval);
    }, this.interval);

    return fn();
  }
}

// Usage
const limiter = new RateLimiter(10, 1000); // 10 requests per second
await limiter.execute(() => fetch('/api/endpoint'));

This comprehensive integration guide provides patterns and examples for effectively combining REST API calls with asynchronous event processing in the Ubyx platform ecosystem.