Python

Complete examples for integrating Turnpike API with Python.

Installation

pip install requests solana python-dotenv websocket-client

Lightning API Examples

Execute Buy Trade

import requests
import os
from dotenv import load_dotenv

load_dotenv()

def execute_buy_trade(public_key: str, mint: str, amount: float, slippage: float = 10):
    """Execute a buy trade using the Lightning API"""

    url = 'https://api.turnpike.dev/trade/buy'
    headers = {
        'Content-Type': 'application/json',
        'Authorization': f'Bearer {os.getenv("TURNPIKE_API_KEY")}'
    }
    payload = {
        'publicKey': public_key,
        'mint': mint,
        'amount': amount,
        'slippage': slippage,
        'priorityFee': 0.0001
    }

    try:
        response = requests.post(url, headers=headers, json=payload)
        response.raise_for_status()

        data = response.json()
        print(f'Trade successful!')
        print(f'Signature: {data["signature"]}')
        print(f'Tokens received: {data["tokensReceived"]}')
        print(f'Effective price: {data["effectivePrice"]}')

        return data
    except requests.exceptions.RequestException as e:
        print(f'Error executing trade: {e}')
        if hasattr(e.response, 'json'):
            print(e.response.json())
        raise

# Usage
execute_buy_trade(
    public_key='YOUR_WALLET_PUBLIC_KEY',
    mint='EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v',
    amount=0.01
)

Get Token Information

def get_token_info(mint: str) -> dict:
    """Get detailed information about a token"""

    url = f'https://api.turnpike.dev/token/info/{mint}'
    headers = {
        'Authorization': f'Bearer {os.getenv("TURNPIKE_API_KEY")}'
    }

    response = requests.get(url, headers=headers)
    response.raise_for_status()

    data = response.json()
    print(f'{data["symbol"]}: ${data["price"]}')
    print(f'Market Cap: ${data["marketCap"]:,}')
    print(f'24h Volume: ${data["volume24h"]:,}')

    return data

# Usage
token_info = get_token_info('EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v')

Get Portfolio

def get_portfolio(public_key: str) -> dict:
    """Get all token holdings for a wallet"""

    url = f'https://api.turnpike.dev/portfolio/{public_key}'
    headers = {
        'Authorization': f'Bearer {os.getenv("TURNPIKE_API_KEY")}'
    }

    response = requests.get(url, headers=headers)
    response.raise_for_status()

    data = response.json()
    print(f'Total Value: ${data["totalValueUsd"]:,.2f}')
    print(f'SOL Balance: {data["solBalance"]}')
    print(f'Tokens: {len(data["tokens"])}')

    for token in data['tokens']:
        print(f'  {token["symbol"]}: {token["balance"]} (${token["valueUsd"]:,.2f})')

    return data

# Usage
portfolio = get_portfolio('YOUR_WALLET_PUBLIC_KEY')

Local Transaction API Examples

Build and Sign Transaction

from solana.rpc.api import Client
from solana.transaction import Transaction
from solders.keypair import Keypair
import base64

def build_and_sign_trade(wallet: Keypair, mint: str, amount: float):
    """Build, sign, and send a transaction locally"""

    # 1. Build transaction via API
    url = 'https://api.turnpike.dev/transaction/buy'
    headers = {
        'Content-Type': 'application/json',
        'Authorization': f'Bearer {os.getenv("TURNPIKE_API_KEY")}'
    }
    payload = {
        'publicKey': str(wallet.pubkey()),
        'mint': mint,
        'amount': amount,
        'slippage': 10
    }

    response = requests.post(url, headers=headers, json=payload)
    response.raise_for_status()
    data = response.json()

    # 2. Deserialize transaction
    transaction_bytes = base64.b64decode(data['transaction'])
    transaction = Transaction.deserialize(transaction_bytes)

    # 3. Connect to RPC
    client = Client('https://api.mainnet-beta.solana.com')

    # 4. Sign transaction
    transaction.sign(wallet)

    # 5. Send transaction
    result = client.send_raw_transaction(transaction.serialize())
    signature = result['result']

    print(f'Transaction sent: {signature}')

    # 6. Confirm transaction
    client.confirm_transaction(signature)
    print(f'Transaction confirmed')

    return signature

# Usage
# Load your wallet (securely)
secret_key_bytes = base64.b64decode(os.getenv('WALLET_PRIVATE_KEY'))
wallet = Keypair.from_bytes(secret_key_bytes)

signature = build_and_sign_trade(
    wallet=wallet,
    mint='TOKEN_MINT_ADDRESS',
    amount=0.01
)

WebSocket Examples

Basic WebSocket Connection

import websocket
import json
import threading

class TurnpikeStream:
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.ws = None

    def on_message(self, ws, message):
        data = json.loads(message)

        if data['type'] == 'trade':
            print(f'Trade: {data["side"]} {data["amount"]} @ ${data["price"]}')
        elif data['type'] == 'newToken':
            print(f'New token: {data["symbol"]} - {data["name"]}')

    def on_error(self, ws, error):
        print(f'Error: {error}')

    def on_close(self, ws, close_status_code, close_msg):
        print('Connection closed')

    def on_open(self, ws):
        print('Connected to Turnpike WebSocket')

        # Authenticate
        ws.send(json.dumps({
            'method': 'authenticate',
            'apiKey': self.api_key
        }))

        # Subscribe to trades
        ws.send(json.dumps({
            'method': 'subscribe',
            'keys': ['trades', 'newTokens']
        }))

    def connect(self):
        websocket.enableTrace(False)
        self.ws = websocket.WebSocketApp(
            'wss://turnpike.dev/api/data',
            on_open=self.on_open,
            on_message=self.on_message,
            on_error=self.on_error,
            on_close=self.on_close
        )

        # Run in background thread
        wst = threading.Thread(target=self.ws.run_forever)
        wst.daemon = True
        wst.start()

    def disconnect(self):
        if self.ws:
            self.ws.close()

# Usage
stream = TurnpikeStream(os.getenv('TURNPIKE_API_KEY'))
stream.connect()

# Keep running
import time
try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    stream.disconnect()

Async WebSocket with Websockets Library

import asyncio
import websockets
import json

async def stream_trades(api_key: str):
    uri = 'wss://turnpike.dev/api/data'

    async with websockets.connect(uri) as websocket:
        # Authenticate
        await websocket.send(json.dumps({
            'method': 'authenticate',
            'apiKey': api_key
        }))

        # Subscribe
        await websocket.send(json.dumps({
            'method': 'subscribe',
            'keys': ['trades']
        }))

        # Listen for messages
        async for message in websocket:
            data = json.loads(message)

            if data['type'] == 'trade':
                print(f'{data["side"]} {data["amount"]} @ ${data["price"]}')

# Usage
asyncio.run(stream_trades(os.getenv('TURNPIKE_API_KEY')))

Complete Trading Bot Example

import requests
import websocket
import json
import time
from typing import Dict, Any

class TradingBot:
    def __init__(self, api_key: str, public_key: str):
        self.api_key = api_key
        self.public_key = public_key
        self.ws = None
        self.target_tokens = {}  # mint -> target_price

    def add_target(self, mint: str, target_price: float):
        """Add a token to watch"""
        self.target_tokens[mint] = target_price

    def execute_buy(self, mint: str, amount: float):
        """Execute a buy trade"""
        try:
            response = requests.post(
                'https://api.turnpike.dev/trade/buy',
                headers={
                    'Content-Type': 'application/json',
                    'Authorization': f'Bearer {self.api_key}'
                },
                json={
                    'publicKey': self.public_key,
                    'mint': mint,
                    'amount': amount,
                    'slippage': 15
                }
            )
            response.raise_for_status()
            data = response.json()
            print(f'Buy executed: {data["signature"]}')
            return data
        except Exception as e:
            print(f'Buy failed: {e}')
            return None

    def on_price_update(self, data: Dict[str, Any]):
        """Handle price update events"""
        mint = data['mint']

        if mint in self.target_tokens:
            target_price = self.target_tokens[mint]
            current_price = data['price']

            if current_price <= target_price:
                print(f'Target price reached for {mint}!')
                self.execute_buy(mint, 0.01)
                # Remove from targets after buying
                del self.target_tokens[mint]

    def on_message(self, ws, message):
        data = json.loads(message)

        if data['type'] == 'priceUpdate':
            self.on_price_update(data)

    def on_open(self, ws):
        print('Bot connected')

        # Authenticate
        ws.send(json.dumps({
            'method': 'authenticate',
            'apiKey': self.api_key
        }))

        # Subscribe to price updates
        ws.send(json.dumps({
            'method': 'subscribe',
            'keys': ['priceUpdates']
        }))

    def start(self):
        """Start the trading bot"""
        self.ws = websocket.WebSocketApp(
            'wss://turnpike.dev/api/data',
            on_open=self.on_open,
            on_message=self.on_message
        )

        self.ws.run_forever()

# Usage
bot = TradingBot(
    api_key=os.getenv('TURNPIKE_API_KEY'),
    public_key='YOUR_WALLET_PUBLIC_KEY'
)

# Add tokens to watch
bot.add_target('TOKEN_MINT_1', 0.00001)
bot.add_target('TOKEN_MINT_2', 0.00005)

# Start bot
bot.start()

Error Handling Example

import time
from typing import Optional

class TurnpikeAPIError(Exception):
    def __init__(self, error_data: dict, status_code: int):
        self.code = error_data.get('code')
        self.message = error_data.get('message')
        self.details = error_data.get('details', {})
        self.status_code = status_code
        super().__init__(self.message)

def execute_trade_with_retry(
    params: dict,
    max_retries: int = 3
) -> Optional[dict]:
    """Execute trade with automatic retry logic"""

    for attempt in range(max_retries):
        try:
            response = requests.post(
                'https://api.turnpike.dev/trade/buy',
                headers={
                    'Content-Type': 'application/json',
                    'Authorization': f'Bearer {os.getenv("TURNPIKE_API_KEY")}'
                },
                json=params
            )

            if not response.ok:
                error_data = response.json()
                raise TurnpikeAPIError(error_data['error'], response.status_code)

            return response.json()

        except TurnpikeAPIError as e:
            if e.code == 'RATE_LIMIT_EXCEEDED':
                wait_time = 2 ** attempt
                print(f'Rate limited, waiting {wait_time}s...')
                time.sleep(wait_time)
                continue
            elif e.code == 'SLIPPAGE_EXCEEDED' and attempt < max_retries - 1:
                params['slippage'] *= 1.5
                print(f'Slippage exceeded, increasing to {params["slippage"]}%')
                continue
            else:
                print(f'Error: {e.code} - {e.message}')
                raise

        except requests.RequestException as e:
            if attempt < max_retries - 1:
                time.sleep(2 ** attempt)
                continue
            raise

    raise Exception(f'Failed after {max_retries} attempts')

# Usage
result = execute_trade_with_retry({
    'publicKey': 'YOUR_WALLET_PUBLIC_KEY',
    'mint': 'TOKEN_MINT_ADDRESS',
    'amount': 0.01,
    'slippage': 10
})

Async/Await Examples

import asyncio
import aiohttp

async def async_execute_trade(public_key: str, mint: str, amount: float):
    """Execute trade asynchronously"""

    url = 'https://api.turnpike.dev/trade/buy'
    headers = {
        'Content-Type': 'application/json',
        'Authorization': f'Bearer {os.getenv("TURNPIKE_API_KEY")}'
    }
    payload = {
        'publicKey': public_key,
        'mint': mint,
        'amount': amount,
        'slippage': 10
    }

    async with aiohttp.ClientSession() as session:
        async with session.post(url, headers=headers, json=payload) as response:
            data = await response.json()
            return data

async def main():
    # Execute multiple trades concurrently
    trades = [
        async_execute_trade('PUBLIC_KEY', 'MINT_1', 0.01),
        async_execute_trade('PUBLIC_KEY', 'MINT_2', 0.01),
        async_execute_trade('PUBLIC_KEY', 'MINT_3', 0.01)
    ]

    results = await asyncio.gather(*trades, return_exceptions=True)

    for result in results:
        if isinstance(result, Exception):
            print(f'Trade failed: {result}')
        else:
            print(f'Trade successful: {result["signature"]}')

# Run
asyncio.run(main())

Last updated