Skip to content

Connection API Code Examples

This guide provides practical code examples for working with the DBConvert Streams Connection API in various programming languages. For the complete API reference, see the Connection API Reference.

Prerequisites

Before using these examples, ensure you have:

  • A valid DBConvert Streams API key
  • The API server running (default: http://127.0.0.1:8020/api/v1)
  • Required language-specific dependencies:
    • Python: requests library (pip install requests)
    • Node.js: axios library (npm install axios)
    • PowerShell: Version 5.1 or later

Important

Before using any connection or stream endpoints, you must first call the /user/configs endpoint to load existing configurations:

bash
curl -X GET "http://127.0.0.1:8020/api/v1/user/configs" \
  -H "X-API-Key: your_api_key_here"

This endpoint loads all user configurations including existing connections and stream configurations. Make sure to call this endpoint:

  • When starting your application
  • Before listing or managing connections
  • Before creating or managing streams

Quick Start

Choose your preferred language:

  • curl - For quick command-line testing
  • Python - Using the requests library
  • Node.js - Using axios with async/await
  • PowerShell - Using PowerShell classes

Each example demonstrates:

  • Listing all connections
  • Creating a new connection
  • Testing a connection
  • Deleting a connection

Using curl

Simple command-line examples for quick testing:

bash
# Set your API key
export API_KEY="your_api_key_here"
export API_URL="http://127.0.0.1:8020/api/v1"

# Load user configurations (REQUIRED)
curl -X GET "$API_URL/user/configs" \
  -H "X-API-Key: $API_KEY"

# List all connections
curl -X GET "$API_URL/connections" \
  -H "X-API-Key: $API_KEY"

# Create a new MySQL connection
curl -X POST "$API_URL/connections" \
  -H "X-API-Key: $API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "name": "mysql-source",
    "type": "mysql",
    "host": "localhost",
    "port": 3306,
    "username": "root",
    "password": "your_password",
    "database": "source_db"
  }'

# Test a connection
curl -X POST "$API_URL/connections/conn_id/ping" \
  -H "X-API-Key: $API_KEY"

# Delete a connection
curl -X DELETE "$API_URL/connections/conn_id" \
  -H "X-API-Key: $API_KEY"

Using Python

A complete Python class implementation with error handling:

python
import requests
from typing import Optional, Dict, List, Any

class DBConvertStreamsAPI:
    """Client for the DBConvert Streams API."""
    
    def __init__(self, api_key: str, base_url: str = "http://127.0.0.1:8020/api/v1"):
        """Initialize the API client.
        
        Args:
            api_key: Your DBConvert Streams API key
            base_url: The base URL of the API server
        """
        self.base_url = base_url
        self.headers = {
            "X-API-Key": api_key,
            "Content-Type": "application/json"
        }

    def load_user_configs(self) -> bool:
        """Load user configurations (REQUIRED before using other endpoints)."""
        try:
            response = requests.get(
                f"{self.base_url}/user/configs",
                headers=self.headers
            )
            response.raise_for_status()
            return True
        except requests.exceptions.RequestException as e:
            print(f"Error loading user configurations: {e}")
            return False

    def list_connections(self) -> Optional[List[Dict[str, Any]]]:
        """List all configured connections."""
        try:
            response = requests.get(
                f"{self.base_url}/connections", 
                headers=self.headers
            )
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            print(f"Error listing connections: {e}")
            return None

    def create_connection(self, connection_data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
        """Create a new database connection.
        
        Args:
            connection_data: Dictionary containing connection details
        """
        try:
            response = requests.post(
                f"{self.base_url}/connections",
                headers=self.headers,
                json=connection_data
            )
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            print(f"Error creating connection: {e}")
            return None

    def test_connection(self, conn_id: str) -> Optional[Dict[str, Any]]:
        """Test if a connection is working.
        
        Args:
            conn_id: The connection ID to test
        """
        try:
            response = requests.post(
                f"{self.base_url}/connections/{conn_id}/ping",
                headers=self.headers
            )
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            print(f"Error testing connection: {e}")
            return None

    def delete_connection(self, conn_id: str) -> bool:
        """Delete a connection.
        
        Args:
            conn_id: The connection ID to delete
        """
        try:
            response = requests.delete(
                f"{self.base_url}/connections/{conn_id}",
                headers=self.headers
            )
            response.raise_for_status()
            return True
        except requests.exceptions.RequestException as e:
            print(f"Error deleting connection: {e}")
            return False

# Usage example
def main():
    api = DBConvertStreamsAPI("your_api_key_here")
    
    # Load user configurations (REQUIRED)
    if not api.load_user_configs():
        print("Failed to load user configurations")
        return

    # Create a MySQL connection
    mysql_connection = {
        "name": "mysql-source",
        "type": "mysql",
        "host": "localhost",
        "port": 3306,
        "username": "root",
        "password": "your_password",
        "database": "source_db"
    }
    
    # Create and test the connection
    new_conn = api.create_connection(mysql_connection)
    if new_conn:
        print(f"Created connection: {new_conn['id']}")
        
        # Test the connection
        test_result = api.test_connection(new_conn['id'])
        if test_result and test_result['ping'] == 'ok':
            print("Connection test successful")
        else:
            print("Connection test failed")

if __name__ == "__main__":
    main()

Using Node.js

A TypeScript-friendly Node.js implementation with async/await:

typescript
import axios, { AxiosInstance } from 'axios';

interface ConnectionData {
    name: string;
    type: 'mysql' | 'postgresql';
    host: string;
    port: number;
    username: string;
    password: string;
    database: string;
    schema?: string;
}

interface ConnectionResponse extends Omit<ConnectionData, 'password'> {
    id: string;
}

interface PingResponse {
    ping: 'ok' | 'failed';
    error?: string;
}

class DBConvertStreamsAPI {
    private client: AxiosInstance;

    constructor(apiKey: string, baseURL: string = 'http://127.0.0.1:8020/api/v1') {
        this.client = axios.create({
            baseURL,
            headers: {
                'X-API-Key': apiKey,
                'Content-Type': 'application/json'
            }
        });
    }

    async loadUserConfigs(): Promise<boolean> {
        try {
            await this.client.get('/user/configs');
            return true;
        } catch (error) {
            console.error('Error loading user configurations:', 
                         error.response?.data || error.message);
            throw error;
        }
    }

    async listConnections(): Promise<ConnectionResponse[]> {
        try {
            const { data } = await this.client.get('/connections');
            return data;
        } catch (error) {
            console.error('Error listing connections:', 
                         error.response?.data || error.message);
            throw error;
        }
    }

    async createConnection(connectionData: ConnectionData): Promise<ConnectionResponse> {
        try {
            const { data } = await this.client.post('/connections', connectionData);
            return data;
        } catch (error) {
            console.error('Error creating connection:', 
                         error.response?.data || error.message);
            throw error;
        }
    }

    async testConnection(connId: string): Promise<PingResponse> {
        try {
            const { data } = await this.client.post(`/connections/${connId}/ping`);
            return data;
        } catch (error) {
            console.error('Error testing connection:', 
                         error.response?.data || error.message);
            throw error;
        }
    }

    async deleteConnection(connId: string): Promise<boolean> {
        try {
            await this.client.delete(`/connections/${connId}`);
            return true;
        } catch (error) {
            console.error('Error deleting connection:', 
                         error.response?.data || error.message);
            throw error;
        }
    }
}

// Usage example
async function main() {
    const api = new DBConvertStreamsAPI('your_api_key_here');

    try {
        // Load user configurations (REQUIRED)
        await api.loadUserConfigs();

        // Create a connection
        const mysqlConnection: ConnectionData = {
            name: 'mysql-source',
            type: 'mysql',
            host: 'localhost',
            port: 3306,
            username: 'root',
            password: 'your_password',
            database: 'source_db'
        };

        const newConn = await api.createConnection(mysqlConnection);
        console.log('Created connection:', newConn.id);

        // Test the connection
        const testResult = await api.testConnection(newConn.id);
        if (testResult.ping === 'ok') {
            console.log('Connection test successful');
        } else {
            console.log('Connection test failed:', testResult.error);
        }
    } catch (error) {
        console.error('Operation failed:', error);
    }
}

main();

Using PowerShell

A PowerShell class implementation with strong typing:

powershell
class DBConvertStreamsAPI {
    [string]$BaseURL
    [hashtable]$Headers

    DBConvertStreamsAPI([string]$ApiKey, [string]$BaseURL = "http://127.0.0.1:8020/api/v1") {
        $this.BaseURL = $BaseURL
        $this.Headers = @{
            "X-API-Key" = $ApiKey
            "Content-Type" = "application/json"
        }
    }

    [bool] LoadUserConfigs() {
        try {
            Invoke-RestMethod `
                -Uri "$($this.BaseURL)/user/configs" `
                -Headers $this.Headers `
                -Method Get
            return $true
        }
        catch {
            Write-Error "Error loading user configurations: $_"
            return $false
        }
    }

    [array] GetConnections() {
        try {
            $response = Invoke-RestMethod `
                -Uri "$($this.BaseURL)/connections" `
                -Headers $this.Headers `
                -Method Get
            return $response
        }
        catch {
            Write-Error "Error listing connections: $_"
            return $null
        }
    }

    [object] CreateConnection([hashtable]$ConnectionData) {
        try {
            $body = $ConnectionData | ConvertTo-Json
            $response = Invoke-RestMethod `
                -Uri "$($this.BaseURL)/connections" `
                -Headers $this.Headers `
                -Method Post `
                -Body $body
            return $response
        }
        catch {
            Write-Error "Error creating connection: $_"
            return $null
        }
    }

    [object] TestConnection([string]$ConnId) {
        try {
            $response = Invoke-RestMethod `
                -Uri "$($this.BaseURL)/connections/$ConnId/ping" `
                -Headers $this.Headers `
                -Method Post
            return $response
        }
        catch {
            Write-Error "Error testing connection: $_"
            return $null
        }
    }

    [bool] DeleteConnection([string]$ConnId) {
        try {
            Invoke-RestMethod `
                -Uri "$($this.BaseURL)/connections/$ConnId" `
                -Headers $this.Headers `
                -Method Delete
            return $true
        }
        catch {
            Write-Error "Error deleting connection: $_"
            return $false
        }
    }
}

# Usage example
$api = [DBConvertStreamsAPI]::new("your_api_key_here")

# Load user configurations (REQUIRED)
if (-not $api.LoadUserConfigs()) {
    Write-Error "Failed to load user configurations"
    return
}

# Create a connection
$mysqlConnection = @{
    name = "mysql-source"
    type = "mysql"
    host = "localhost"
    port = 3306
    username = "root"
    password = "your_password"
    database = "source_db"
}

$newConn = $api.CreateConnection($mysqlConnection)
if ($newConn) {
    Write-Host "Created connection: $($newConn.id)"
    
    # Test the connection
    $testResult = $api.TestConnection($newConn.id)
    if ($testResult.ping -eq "ok") {
        Write-Host "Connection test successful"
    }
    else {
        Write-Host "Connection test failed: $($testResult.error)"
    }
}

Error Handling

Each example includes comprehensive error handling:

  • All examples catch and handle network errors
  • Response status codes are checked
  • Error messages are logged with details
  • Type safety is enforced where possible

Common error scenarios:

  • 401: Invalid or missing API key
  • 404: Connection not found
  • 400: Invalid request data
  • 503: Service unavailable

Next Steps

DBConvert Streams - event driven replication for databases