💻 Xlytix Code Examples & Implementation

🐍 Python Client SDK
Complete Xlytix Python Client
import requests import json import time from datetime import datetime class XlytixClient: """Xlytix API Client for programmatic access""" def __init__(self, api_key, base_url="https://api.xlytix.com/v1"): self.api_key = api_key self.base_url = base_url self.headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } def run_sync(self, connector_id, trigger="manual"): """Trigger a sync job for a connector""" endpoint = f"{self.base_url}/connectors/{connector_id}/run" payload = {"trigger": trigger} response = requests.post( endpoint, headers=self.headers, json=payload ) response.raise_for_status() run_id = response.json()["run_id"] return run_id def wait_for_completion(self, connector_id, run_id, poll_interval=5): """Poll until sync job completes""" while True: status_data = self.get_sync_status(connector_id, run_id) status = status_data["status"] if status in ("completed", "failed"): print(f"Run {run_id} finished: {status}") return status_data time.sleep(poll_interval) # Usage Example client = XlytixClient(api_key="YOUR_API_KEY") run_id = client.run_sync("connector-uuid-123") result = client.wait_for_completion("connector-uuid-123", run_id)
⚙️ Connector Configuration Examples
PostgreSQL to Snowflake
{ "connector_name": "postgres-to-snowflake", "source": { "type": "postgresql", "host": "postgres.example.com", "port": 5432, "database": "production", "tables": ["users", "orders"] }, "target": { "type": "snowflake", "account": "xy12345.us-east-1", "warehouse": "COMPUTE_WH", "database": "ANALYTICS" }, "sync": { "mode": "incremental", "schedule": "0 */6 * * *" } }
REST API to S3
{ "connector_name": "api-to-s3", "source": { "type": "rest_api", "endpoint": "https://api.example.com/data", "authentication": { "method": "oauth2", "client_id": "client_id" }, "pagination": { "type": "offset", "limit": 100 } }, "target": { "type": "s3", "bucket": "data-lake", "format": "parquet" } }
📋 Schema Management Code
Schema Detection & Evolution Handler
class SchemaManager: """Handles schema evolution and drift detection""" def detect_schema(self, data_sample): """Auto-detect schema from data sample""" schema = { "fields": [], "version": 1, "detected_at": datetime.now().isoformat() } for column, value in data_sample.items(): field = { "name": column, "type": self._infer_type(value), "nullable": value is None } schema["fields"].append(field) return schema def compare_schemas(self, old_schema, new_schema): """Compare schemas and detect drift""" drift = { "added_fields": [], "removed_fields": [], "type_changes": [] } old_fields = {f["name"]: f for f in old_schema["fields"]} new_fields = {f["name"]: f for f in new_schema["fields"]} # Detect added fields for name in new_fields: if name not in old_fields: drift["added_fields"].append(new_fields[name]) return drift
🔄 Incremental Sync Implementation
Incremental Sync Engine
class IncrementalSyncEngine: """Handles incremental data synchronization""" def __init__(self, connector_config): self.config = connector_config self.checkpoint_manager = CheckpointManager() def sync(self): """Execute incremental sync""" # Get last checkpoint checkpoint = self.checkpoint_manager.get_checkpoint( self.config["connector_id"] ) # Fetch new data since checkpoint new_data = self._fetch_incremental_data(checkpoint) # Process and load data processed_data = self._process_data(new_data) self._load_data(processed_data) # Update checkpoint new_checkpoint = self._calculate_checkpoint(new_data) self.checkpoint_manager.save_checkpoint( self.config["connector_id"], new_checkpoint ) return { "records_processed": len(new_data), "checkpoint": new_checkpoint } def _fetch_from_database(self, checkpoint): """Fetch incremental data from database""" query = f""" SELECT * FROM {self.config['table_name']} WHERE {self.config['timestamp_column']} > '{checkpoint['timestamp']}' ORDER BY {self.config['timestamp_column']} """ return self._execute_query(query)
🛡️ Error Handling & Retry Logic
Retry Handler
class RetryHandler: """Retry with exponential backoff""" def __init__(self, max_retries=3): self.max_retries = max_retries self.base_delay = 1 def execute_with_retry(self, func): for attempt in range(self.max_retries): try: return func() except Exception as e: if attempt == self.max_retries - 1: raise delay = self.base_delay * (2 ** attempt) print(f"Retry in {delay}s...") time.sleep(delay)
Error Handler
class ErrorHandler: """Centralized error handling""" def handle_error(self, error, context): """Handle and log errors""" error_record = { "timestamp": datetime.now(), "error_type": type(error).__name__, "message": str(error), "context": context } self.error_log.append(error_record) if self._is_critical(error): self._send_alert(error_record) return error_record
⚙️ Data Processing Engine
Data Processor with Transformations
class DataProcessor: """Core data processing engine""" def process(self, data, config): """Process data through pipeline""" # Apply transformations transformed_data = self._apply_transformations(data, config) # Validate data validation_results = self._validate_data(transformed_data, config) # Filter invalid records valid_data = self._filter_valid_records( transformed_data, validation_results ) return { "data": valid_data, "validation_results": validation_results, "metrics": self._calculate_metrics(data, valid_data) } def _apply_transformations(self, data, config): """Apply configured transformations""" result = data for transformer in config.get("transformations", []): result = self._apply_transformer(result, transformer) return result
📊 Monitoring & Metrics
Monitoring Service Implementation
class MonitoringService: """Monitoring and observability service""" def track_sync_job(self, job_id, metrics): """Track sync job metrics""" self.metrics_collector.record({ "job_id": job_id, "timestamp": datetime.now().isoformat(), "records_processed": metrics["records_processed"], "duration_seconds": metrics["duration"], "bytes_transferred": metrics["bytes"], "status": metrics["status"], "error_count": metrics.get("errors", 0) }) # Check for anomalies if self._detect_anomaly(metrics): self.alert_manager.send_alert({ "severity": "warning", "message": f"Anomaly detected in job {job_id}", "metrics": metrics }) def get_connector_health(self, connector_id): """Get connector health status""" recent_runs = self._get_recent_runs(connector_id, limit=10) success_rate = sum( 1 for r in recent_runs if r["status"] == "completed" ) / len(recent_runs) return { "connector_id": connector_id, "success_rate": success_rate, "health_status": self._calculate_health(success_rate) }