Connection API Code Examples
This guide provides practical code examples for working with the DBConvert Streams Connection API in various programming languages. For the complete API reference, see the Connection API Reference.
Prerequisites
Before using these examples, ensure you have:
- A valid DBConvert Streams API key
- The API server running (default:
http://127.0.0.1:8020/api/v1
) - Required language-specific dependencies:
- Python:
requests
library (pip install requests
) - Node.js:
axios
library (npm install axios
) - PowerShell: Version 5.1 or later
- Python:
Important
Before using any connection or stream endpoints, you must first call the /user/configs
endpoint to load existing configurations:
curl -X GET "http://127.0.0.1:8020/api/v1/user/configs" \
-H "X-API-Key: your_api_key_here"
This endpoint loads all user configurations including existing connections and stream configurations. Make sure to call this endpoint:
- When starting your application
- Before listing or managing connections
- Before creating or managing streams
Quick Start
Choose your preferred language:
- curl - For quick command-line testing
- Python - Using the requests library
- Node.js - Using axios with async/await
- PowerShell - Using PowerShell classes
Each example demonstrates:
- Listing all connections
- Creating a new connection
- Testing a connection
- Deleting a connection
Using curl
Simple command-line examples for quick testing:
# Set your API key
export API_KEY="your_api_key_here"
export API_URL="http://127.0.0.1:8020/api/v1"
# Load user configurations (REQUIRED)
curl -X GET "$API_URL/user/configs" \
-H "X-API-Key: $API_KEY"
# List all connections
curl -X GET "$API_URL/connections" \
-H "X-API-Key: $API_KEY"
# Create a new MySQL connection
curl -X POST "$API_URL/connections" \
-H "X-API-Key: $API_KEY" \
-H "Content-Type: application/json" \
-d '{
"name": "mysql-source",
"type": "mysql",
"host": "localhost",
"port": 3306,
"username": "root",
"password": "your_password",
"database": "source_db"
}'
# Test a connection
curl -X POST "$API_URL/connections/conn_id/ping" \
-H "X-API-Key: $API_KEY"
# Delete a connection
curl -X DELETE "$API_URL/connections/conn_id" \
-H "X-API-Key: $API_KEY"
Using Python
A complete Python class implementation with error handling:
import requests
from typing import Optional, Dict, List, Any
class DBConvertStreamsAPI:
"""Client for the DBConvert Streams API."""
def __init__(self, api_key: str, base_url: str = "http://127.0.0.1:8020/api/v1"):
"""Initialize the API client.
Args:
api_key: Your DBConvert Streams API key
base_url: The base URL of the API server
"""
self.base_url = base_url
self.headers = {
"X-API-Key": api_key,
"Content-Type": "application/json"
}
def load_user_configs(self) -> bool:
"""Load user configurations (REQUIRED before using other endpoints)."""
try:
response = requests.get(
f"{self.base_url}/user/configs",
headers=self.headers
)
response.raise_for_status()
return True
except requests.exceptions.RequestException as e:
print(f"Error loading user configurations: {e}")
return False
def list_connections(self) -> Optional[List[Dict[str, Any]]]:
"""List all configured connections."""
try:
response = requests.get(
f"{self.base_url}/connections",
headers=self.headers
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"Error listing connections: {e}")
return None
def create_connection(self, connection_data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Create a new database connection.
Args:
connection_data: Dictionary containing connection details
"""
try:
response = requests.post(
f"{self.base_url}/connections",
headers=self.headers,
json=connection_data
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"Error creating connection: {e}")
return None
def test_connection(self, conn_id: str) -> Optional[Dict[str, Any]]:
"""Test if a connection is working.
Args:
conn_id: The connection ID to test
"""
try:
response = requests.post(
f"{self.base_url}/connections/{conn_id}/ping",
headers=self.headers
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"Error testing connection: {e}")
return None
def delete_connection(self, conn_id: str) -> bool:
"""Delete a connection.
Args:
conn_id: The connection ID to delete
"""
try:
response = requests.delete(
f"{self.base_url}/connections/{conn_id}",
headers=self.headers
)
response.raise_for_status()
return True
except requests.exceptions.RequestException as e:
print(f"Error deleting connection: {e}")
return False
# Usage example
def main():
api = DBConvertStreamsAPI("your_api_key_here")
# Load user configurations (REQUIRED)
if not api.load_user_configs():
print("Failed to load user configurations")
return
# Create a MySQL connection
mysql_connection = {
"name": "mysql-source",
"type": "mysql",
"host": "localhost",
"port": 3306,
"username": "root",
"password": "your_password",
"database": "source_db"
}
# Create and test the connection
new_conn = api.create_connection(mysql_connection)
if new_conn:
print(f"Created connection: {new_conn['id']}")
# Test the connection
test_result = api.test_connection(new_conn['id'])
if test_result and test_result['ping'] == 'ok':
print("Connection test successful")
else:
print("Connection test failed")
if __name__ == "__main__":
main()
Using Node.js
A TypeScript-friendly Node.js implementation with async/await:
import axios, { AxiosInstance } from 'axios';
interface ConnectionData {
name: string;
type: 'mysql' | 'postgresql';
host: string;
port: number;
username: string;
password: string;
database: string;
schema?: string;
}
interface ConnectionResponse extends Omit<ConnectionData, 'password'> {
id: string;
}
interface PingResponse {
ping: 'ok' | 'failed';
error?: string;
}
class DBConvertStreamsAPI {
private client: AxiosInstance;
constructor(apiKey: string, baseURL: string = 'http://127.0.0.1:8020/api/v1') {
this.client = axios.create({
baseURL,
headers: {
'X-API-Key': apiKey,
'Content-Type': 'application/json'
}
});
}
async loadUserConfigs(): Promise<boolean> {
try {
await this.client.get('/user/configs');
return true;
} catch (error) {
console.error('Error loading user configurations:',
error.response?.data || error.message);
throw error;
}
}
async listConnections(): Promise<ConnectionResponse[]> {
try {
const { data } = await this.client.get('/connections');
return data;
} catch (error) {
console.error('Error listing connections:',
error.response?.data || error.message);
throw error;
}
}
async createConnection(connectionData: ConnectionData): Promise<ConnectionResponse> {
try {
const { data } = await this.client.post('/connections', connectionData);
return data;
} catch (error) {
console.error('Error creating connection:',
error.response?.data || error.message);
throw error;
}
}
async testConnection(connId: string): Promise<PingResponse> {
try {
const { data } = await this.client.post(`/connections/${connId}/ping`);
return data;
} catch (error) {
console.error('Error testing connection:',
error.response?.data || error.message);
throw error;
}
}
async deleteConnection(connId: string): Promise<boolean> {
try {
await this.client.delete(`/connections/${connId}`);
return true;
} catch (error) {
console.error('Error deleting connection:',
error.response?.data || error.message);
throw error;
}
}
}
// Usage example
async function main() {
const api = new DBConvertStreamsAPI('your_api_key_here');
try {
// Load user configurations (REQUIRED)
await api.loadUserConfigs();
// Create a connection
const mysqlConnection: ConnectionData = {
name: 'mysql-source',
type: 'mysql',
host: 'localhost',
port: 3306,
username: 'root',
password: 'your_password',
database: 'source_db'
};
const newConn = await api.createConnection(mysqlConnection);
console.log('Created connection:', newConn.id);
// Test the connection
const testResult = await api.testConnection(newConn.id);
if (testResult.ping === 'ok') {
console.log('Connection test successful');
} else {
console.log('Connection test failed:', testResult.error);
}
} catch (error) {
console.error('Operation failed:', error);
}
}
main();
Using PowerShell
A PowerShell class implementation with strong typing:
class DBConvertStreamsAPI {
[string]$BaseURL
[hashtable]$Headers
DBConvertStreamsAPI([string]$ApiKey, [string]$BaseURL = "http://127.0.0.1:8020/api/v1") {
$this.BaseURL = $BaseURL
$this.Headers = @{
"X-API-Key" = $ApiKey
"Content-Type" = "application/json"
}
}
[bool] LoadUserConfigs() {
try {
Invoke-RestMethod `
-Uri "$($this.BaseURL)/user/configs" `
-Headers $this.Headers `
-Method Get
return $true
}
catch {
Write-Error "Error loading user configurations: $_"
return $false
}
}
[array] GetConnections() {
try {
$response = Invoke-RestMethod `
-Uri "$($this.BaseURL)/connections" `
-Headers $this.Headers `
-Method Get
return $response
}
catch {
Write-Error "Error listing connections: $_"
return $null
}
}
[object] CreateConnection([hashtable]$ConnectionData) {
try {
$body = $ConnectionData | ConvertTo-Json
$response = Invoke-RestMethod `
-Uri "$($this.BaseURL)/connections" `
-Headers $this.Headers `
-Method Post `
-Body $body
return $response
}
catch {
Write-Error "Error creating connection: $_"
return $null
}
}
[object] TestConnection([string]$ConnId) {
try {
$response = Invoke-RestMethod `
-Uri "$($this.BaseURL)/connections/$ConnId/ping" `
-Headers $this.Headers `
-Method Post
return $response
}
catch {
Write-Error "Error testing connection: $_"
return $null
}
}
[bool] DeleteConnection([string]$ConnId) {
try {
Invoke-RestMethod `
-Uri "$($this.BaseURL)/connections/$ConnId" `
-Headers $this.Headers `
-Method Delete
return $true
}
catch {
Write-Error "Error deleting connection: $_"
return $false
}
}
}
# Usage example
$api = [DBConvertStreamsAPI]::new("your_api_key_here")
# Load user configurations (REQUIRED)
if (-not $api.LoadUserConfigs()) {
Write-Error "Failed to load user configurations"
return
}
# Create a connection
$mysqlConnection = @{
name = "mysql-source"
type = "mysql"
host = "localhost"
port = 3306
username = "root"
password = "your_password"
database = "source_db"
}
$newConn = $api.CreateConnection($mysqlConnection)
if ($newConn) {
Write-Host "Created connection: $($newConn.id)"
# Test the connection
$testResult = $api.TestConnection($newConn.id)
if ($testResult.ping -eq "ok") {
Write-Host "Connection test successful"
}
else {
Write-Host "Connection test failed: $($testResult.error)"
}
}
Error Handling
Each example includes comprehensive error handling:
- All examples catch and handle network errors
- Response status codes are checked
- Error messages are logged with details
- Type safety is enforced where possible
Common error scenarios:
401
: Invalid or missing API key404
: Connection not found400
: Invalid request data503
: Service unavailable
Next Steps
- View the Connection API Reference for detailed endpoint documentation
- Learn about Stream Configuration to set up data streams