Blogs

LLM Cost Optimization Part 2: Advanced Techniques for Scale

2025-01-2030 min read
LLM Cost Optimization Part 2: Advanced Techniques for Scale

From sub-second responses to 10x throughput improvements. Master the advanced optimization techniques that power production LLM applications.

Where We Left Off

In Part 1, we covered the essentials:

✅ Observability with Langfuse ✅ Response caching (100% savings on duplicates) ✅ Smart model selection (90% cost reduction) ✅ Structured outputs (reliable small models)

Now let's dive into advanced techniques that will take your optimization to the next level.

5. Prompt Caching: Reuse Computed Tokens

When you have long, repeated contexts (system prompts, documentation, examples), prompt caching can save 90% on those tokens.

How Prompt Caching Works

Instead of reprocessing the same prompt prefix every time, LLMs cache the computed attention keys and values (KV cache) for reuse.

Implementation with LiteLLM

import litellm
from litellm import completion
import os

# Anthropic Prompt Caching Example
os.environ["ANTHROPIC_API_KEY"] = "your-key"

# First request - creates cache
response1 = completion(
    model="anthropic/claude-3-5-sonnet-20241022",
    messages=[
        {
            "role": "system",
            "content": [
                {
                    "type": "text",
                    "text": "You are an expert code reviewer. Here are the coding standards..." * 100,
                    "cache_control": {"type": "ephemeral"}  # Cache this part
                }
            ]
        },
        {
            "role": "user",
            "content": "Review this code: print('hello')"
        }
    ]
)

print(f"Cache creation tokens: {response1.usage.cache_creation_input_tokens}")
print(f"Cache read tokens: {response1.usage.cache_read_input_tokens}")

# Second request - uses cache (90% cheaper for cached portion)
response2 = completion(
    model="anthropic/claude-3-5-sonnet-20241022",
    messages=[
        {
            "role": "system",
            "content": [
                {
                    "type": "text",
                    "text": "You are an expert code reviewer. Here are the coding standards..." * 100,
                    "cache_control": {"type": "ephemeral"}  # Reuse cache
                }
            ]
        },
        {
            "role": "user",
            "content": "Review this code: x = 5"  # Different query
        }
    ]
)

print(f"Cache hit! Read tokens: {response2.usage.cache_read_input_tokens}")

OpenAI Prompt Caching (Automatic)

# OpenAI automatically caches prompts ≥1024 tokens
# No special configuration needed!

import litellm

# Long system prompt (>1024 tokens)
system_prompt = "You are an expert assistant. " * 200  # ~1200 tokens

responses = []
for i in range(5):
    response = litellm.completion(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": f"Question {i}: What is {i}+{i}?"}
        ]
    )

    # Check if tokens were cached (available in response)
    if hasattr(response.usage, 'prompt_tokens_details'):
        cached = response.usage.prompt_tokens_details.get('cached_tokens', 0)
        print(f"Request {i}: Cached tokens: {cached}")

Cost Analysis Tool

def analyze_caching_savings(base_prompt: str, queries: list, model: str):
    """Calculate savings from prompt caching"""

    total_cost_without_cache = 0
    total_cost_with_cache = 0

    for i, query in enumerate(queries):
        response = litellm.completion(
            model=model,
            messages=[
                {"role": "system", "content": base_prompt},
                {"role": "user", "content": query}
            ]
        )

        # Calculate costs
        standard_cost = litellm.completion_cost(completion_response=response)

        # Estimate cached cost (if applicable)
        if hasattr(response.usage, 'prompt_tokens_details'):
            cached_tokens = response.usage.prompt_tokens_details.get('cached_tokens', 0)
            # Cached tokens cost ~10% of regular tokens
            cache_discount = cached_tokens * 0.9 * (standard_cost / response.usage.total_tokens)
            cached_cost = standard_cost - cache_discount
        else:
            cached_cost = standard_cost

        total_cost_without_cache += standard_cost
        total_cost_with_cache += cached_cost

        print(f"Query {i}: Standard ${standard_cost:.4f} | Cached ${cached_cost:.4f}")

    savings = (1 - total_cost_with_cache/total_cost_without_cache) * 100
    print(f"\nTotal Savings: {savings:.1f}%")
    print(f"Break-even point: {2 if model.startswith('anthropic') else 1} requests")

📚 Documentation:

6. Batch Calling: Process Multiple Requests Efficiently

Batch processing can improve throughput by 5-10x and reduce costs through better resource utilization.

OpenAI Batch API with LiteLLM

import litellm
from typing import List, Dict
import asyncio
import time

class BatchProcessor:
    def __init__(self, model: str = "gpt-4o-mini"):
        self.model = model

    async def process_batch_async(self, messages_list: List[List[Dict]]) -> List:
        """Process multiple requests concurrently"""

        tasks = []
        for messages in messages_list:
            # Create async task for each request
            task = litellm.acompletion(
                model=self.model,
                messages=messages,
                metadata={"batch": True}
            )
            tasks.append(task)

        # Execute all requests concurrently
        start_time = time.time()
        responses = await asyncio.gather(*tasks)
        elapsed = time.time() - start_time

        print(f"Processed {len(messages_list)} requests in {elapsed:.2f}s")
        print(f"Average time per request: {elapsed/len(messages_list):.2f}s")

        return responses

    def process_batch_sync(self, messages_list: List[List[Dict]]) -> List:
        """Synchronous batch processing with OpenAI Batch API"""

        # For OpenAI's batch endpoint (50% discount)
        batch_requests = []
        for i, messages in enumerate(messages_list):
            batch_requests.append({
                "custom_id": f"request-{i}",
                "method": "POST",
                "url": "/v1/chat/completions",
                "body": {
                    "model": self.model,
                    "messages": messages
                }
            })

        # Note: OpenAI Batch API requires file upload
        # This is a simplified example
        import json

        # Save to JSONL file
        with open("batch_requests.jsonl", "w") as f:
            for req in batch_requests:
                f.write(json.dumps(req) + "\n")

        print(f"Prepared {len(batch_requests)} requests for batch processing")
        print("With OpenAI Batch API: 50% discount, 24-hour turnaround")

        return batch_requests

# Usage Example
async def main():
    processor = BatchProcessor()

    # Prepare multiple requests
    queries = [
        "What is the capital of France?",
        "Explain quantum computing",
        "Write a haiku about coding",
        "Translate 'hello' to Spanish",
        "What's 25 * 4?"
    ]

    messages_list = [
        [{"role": "user", "content": query}]
        for query in queries
    ]

    # Method 1: Async concurrent processing (fast)
    responses = await processor.process_batch_async(messages_list)

    # Calculate batch efficiency
    total_tokens = sum(r.usage.total_tokens for r in responses)
    total_cost = sum(litellm.completion_cost(completion_response=r) for r in responses)

    print(f"\nBatch Statistics:")
    print(f"Total tokens: {total_tokens}")
    print(f"Total cost: ${total_cost:.4f}")
    print(f"Average cost per request: ${total_cost/len(queries):.4f}")

# Run the batch
asyncio.run(main())

Smart Batching Strategy

import litellm
from collections import defaultdict
from typing import List, Dict, Any
import asyncio

class SmartBatcher:
    def __init__(self,
                 batch_size: int = 10,
                 max_wait_time: float = 1.0,
                 model: str = "gpt-4o-mini"):
        self.batch_size = batch_size
        self.max_wait_time = max_wait_time
        self.model = model
        self.pending_requests = []
        self.results = {}

    async def add_request(self,
                          request_id: str,
                          messages: List[Dict],
                          priority: int = 0) -> Dict:
        """Add request to batch queue"""

        request = {
            "id": request_id,
            "messages": messages,
            "priority": priority,
            "timestamp": asyncio.get_event_loop().time()
        }

        self.pending_requests.append(request)

        # Check if we should process batch
        should_process = (
            len(self.pending_requests) >= self.batch_size or
            (self.pending_requests and
             asyncio.get_event_loop().time() - self.pending_requests[0]["timestamp"] > self.max_wait_time)
        )

        if should_process:
            await self._process_batch()

        # Wait for result
        while request_id not in self.results:
            await asyncio.sleep(0.1)

        return self.results.pop(request_id)

    async def _process_batch(self):
        """Process accumulated requests"""

        # Sort by priority and take batch_size
        batch = sorted(self.pending_requests,
                      key=lambda x: -x["priority"])[:self.batch_size]

        # Remove from pending
        for req in batch:
            self.pending_requests.remove(req)

        print(f"Processing batch of {len(batch)} requests")

        # Process concurrently
        tasks = []
        for req in batch:
            task = litellm.acompletion(
                model=self.model,
                messages=req["messages"],
                metadata={"batch_id": req["id"]}
            )
            tasks.append(task)

        responses = await asyncio.gather(*tasks)

        # Store results
        for req, response in zip(batch, responses):
            self.results[req["id"]] = response

        # Log batch performance
        total_cost = sum(
            litellm.completion_cost(completion_response=r)
            for r in responses
        )
        print(f"Batch completed. Total cost: ${total_cost:.4f}")

# Example: Batching user requests in a web service
async def handle_user_request(batcher: SmartBatcher, user_id: str, query: str):
    """Handle individual user request with batching"""

    messages = [{"role": "user", "content": query}]

    # High priority for premium users
    priority = 10 if user_id.startswith("premium") else 0

    response = await batcher.add_request(
        request_id=f"{user_id}-{asyncio.get_event_loop().time()}",
        messages=messages,
        priority=priority
    )

    return response

# Simulate multiple users
async def simulate_traffic():
    batcher = SmartBatcher(batch_size=5, max_wait_time=0.5)

    # Simulate 20 concurrent users
    tasks = []
    for i in range(20):
        user_id = f"premium-{i}" if i < 5 else f"user-{i}"
        query = f"Question {i}: What is {i} * {i}?"

        task = handle_user_request(batcher, user_id, query)
        tasks.append(task)

        # Stagger requests slightly
        await asyncio.sleep(0.1)

    responses = await asyncio.gather(*tasks)
    print(f"Processed {len(responses)} requests in batches")

# Run simulation
asyncio.run(simulate_traffic())

📚 Documentation:

7. Semantic Caching: Handle Similar Queries

Unlike exact-match caching, semantic caching handles paraphrased or similar queries.

import litellm
import numpy as np
from typing import Optional, List, Dict
import redis
import json
import hashlib

class SemanticCache:
    def __init__(self,
                 similarity_threshold: float = 0.95,
                 redis_client: Optional[redis.Redis] = None):
        self.threshold = similarity_threshold
        self.redis = redis_client or redis.Redis()
        self.embedding_model = "text-embedding-3-small"

    async def get_embedding(self, text: str) -> List[float]:
        """Get embedding for text"""
        response = await litellm.aembedding(
            model=self.embedding_model,
            input=text
        )
        return response.data[0].embedding

    def cosine_similarity(self, vec1: List[float], vec2: List[float]) -> float:
        """Calculate cosine similarity between two vectors"""
        vec1, vec2 = np.array(vec1), np.array(vec2)
        return np.dot(vec1, vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2))

    async def get(self, messages: List[Dict], model: str) -> Optional[Dict]:
        """Check semantic cache for similar query"""

        # Generate embedding for current query
        query_text = json.dumps(messages)
        query_embedding = await self.get_embedding(query_text)

        # Get all cached embeddings
        pattern = f"semantic_cache:{model}:*"
        cached_keys = self.redis.keys(pattern)

        best_match = None
        best_similarity = 0

        for key in cached_keys:
            cached_data = json.loads(self.redis.get(key))
            cached_embedding = cached_data["embedding"]

            # Calculate similarity
            similarity = self.cosine_similarity(query_embedding, cached_embedding)

            if similarity > best_similarity and similarity >= self.threshold:
                best_similarity = similarity
                best_match = cached_data["response"]

                print(f"Semantic cache hit! Similarity: {similarity:.3f}")

        return best_match

    async def set(self,
                  messages: List[Dict],
                  model: str,
                  response: Dict,
                  ttl: int = 3600):
        """Store in semantic cache"""

        query_text = json.dumps(messages)
        query_embedding = await self.get_embedding(query_text)

        # Create cache key
        cache_key = f"semantic_cache:{model}:{hashlib.md5(query_text.encode()).hexdigest()}"

        # Store with embedding
        cache_data = {
            "messages": messages,
            "embedding": query_embedding,
            "response": response,
            "model": model
        }

        self.redis.setex(cache_key, ttl, json.dumps(cache_data))

class SemanticCacheLLM:
    def __init__(self):
        self.semantic_cache = SemanticCache(similarity_threshold=0.93)

    async def completion(self, messages: List[Dict], model: str = "gpt-4o-mini", **kwargs):
        """LLM completion with semantic caching"""

        # Try semantic cache first
        cached = await self.semantic_cache.get(messages, model)
        if cached:
            print("✨ Semantic cache hit!")
            return cached

        # Make actual request
        response = await litellm.acompletion(
            model=model,
            messages=messages,
            **kwargs
        )

        # Store in semantic cache
        await self.semantic_cache.set(
            messages,
            model,
            response.model_dump()
        )

        return response

# Test semantic caching
async def test_semantic_cache():
    llm = SemanticCacheLLM()

    # Similar queries that should hit cache
    queries = [
        "What's the capital of France?",
        "Tell me the capital city of France",
        "What is France's capital?",
        "France capital?",
        "Which city is the capital of France?"
    ]

    for i, query in enumerate(queries):
        print(f"\nQuery {i+1}: {query}")

        response = await llm.completion(
            messages=[{"role": "user", "content": query}]
        )

        if i == 0:
            print("Initial query - stored in cache")
        else:
            # Should hit semantic cache for similar queries
            pass

asyncio.run(test_semantic_cache())

8. Streaming Optimizations: Improve Perceived Performance

Streaming reduces time-to-first-token (TTFT) and improves user experience.

import litellm
import asyncio
import time
from typing import AsyncIterator

class StreamingOptimizer:
    def __init__(self):
        self.first_token_times = []

    async def stream_completion(self,
                               messages: list,
                               model: str = "gpt-4o-mini") -> AsyncIterator[str]:
        """Stream completion with performance tracking"""

        start_time = time.time()
        first_token_time = None
        full_response = ""

        # Create streaming request
        response = await litellm.acompletion(
            model=model,
            messages=messages,
            stream=True,
            metadata={"streaming": True}
        )

        async for chunk in response:
            if chunk.choices[0].delta.content:
                content = chunk.choices[0].delta.content

                # Track time to first token
                if first_token_time is None:
                    first_token_time = time.time() - start_time
                    self.first_token_times.append(first_token_time)
                    print(f"⚡ First token in {first_token_time:.3f}s")

                full_response += content
                yield content

        # Calculate metrics
        total_time = time.time() - start_time
        avg_ttft = sum(self.first_token_times) / len(self.first_token_times)

        print(f"\nStreaming metrics:")
        print(f"Total time: {total_time:.2f}s")
        print(f"First token: {first_token_time:.3f}s")
        print(f"Avg TTFT: {avg_ttft:.3f}s")

    async def parallel_stream(self,
                            queries: list,
                            model: str = "gpt-4o-mini") -> dict:
        """Stream multiple requests in parallel"""

        async def process_stream(query_id: str, messages: list):
            """Process individual stream"""
            result = {
                "id": query_id,
                "chunks": [],
                "first_token_time": None,
                "total_time": None
            }

            start_time = time.time()

            async for chunk in self.stream_completion(messages, model):
                if result["first_token_time"] is None:
                    result["first_token_time"] = time.time() - start_time
                result["chunks"].append(chunk)

            result["total_time"] = time.time() - start_time
            result["response"] = "".join(result["chunks"])

            return result

        # Create tasks for parallel streaming
        tasks = []
        for i, query in enumerate(queries):
            messages = [{"role": "user", "content": query}]
            task = process_stream(f"query-{i}", messages)
            tasks.append(task)

        # Process all streams in parallel
        results = await asyncio.gather(*tasks)

        # Analyze performance
        avg_first_token = sum(r["first_token_time"] for r in results) / len(results)
        avg_total = sum(r["total_time"] for r in results) / len(results)

        print(f"\nParallel streaming {len(queries)} requests:")
        print(f"Average first token: {avg_first_token:.3f}s")
        print(f"Average total time: {avg_total:.2f}s")

        return results

# Example: Stream with buffering for smooth output
async def smooth_streaming_demo():
    """Demonstrate smooth streaming with buffer"""

    optimizer = StreamingOptimizer()

    messages = [
        {"role": "system", "content": "You are a helpful assistant."},
        {"role": "user", "content": "Write a short story about a robot learning to paint."}
    ]

    print("Streaming response:")
    print("-" * 50)

    buffer = []
    async for chunk in optimizer.stream_completion(messages):
        buffer.append(chunk)

        # Output in word chunks for smoother display
        if len(buffer) > 5 or chunk.endswith(('.', '!', '?')):
            print("".join(buffer), end="", flush=True)
            buffer = []
            await asyncio.sleep(0.01)  # Small delay for readability

    # Flush remaining buffer
    if buffer:
        print("".join(buffer))

asyncio.run(smooth_streaming_demo())

9. Fine-tuning Economics: When to Fine-tune vs Prompt

Fine-tuning can reduce costs for repetitive, specialized tasks.

class FineTuningAnalyzer:
    """Analyze when fine-tuning is cost-effective"""

    def __init__(self):
        self.costs = {
            # Per 1M tokens
            "gpt-4o": {"input": 2.50, "output": 10.00},
            "gpt-4o-mini": {"input": 0.15, "output": 0.60},
            "gpt-3.5-turbo": {"input": 0.50, "output": 1.50},
            "fine-tuned-gpt-3.5": {"input": 3.00, "output": 6.00},

            # Fine-tuning costs
            "training": 8.00,  # per 1M tokens
        }

    def calculate_breakeven(self,
                           base_model: str,
                           prompt_tokens: int,
                           completion_tokens: int,
                           training_tokens: int,
                           monthly_requests: int) -> dict:
        """Calculate fine-tuning break-even point"""

        # Cost per request with base model + long prompt
        base_cost_per_request = (
            (prompt_tokens * self.costs[base_model]["input"] / 1_000_000) +
            (completion_tokens * self.costs[base_model]["output"] / 1_000_000)
        )

        # Cost per request with fine-tuned model (shorter prompt)
        # Assume 80% prompt reduction after fine-tuning
        reduced_prompt_tokens = prompt_tokens * 0.2
        ft_cost_per_request = (
            (reduced_prompt_tokens * self.costs["fine-tuned-gpt-3.5"]["input"] / 1_000_000) +
            (completion_tokens * self.costs["fine-tuned-gpt-3.5"]["output"] / 1_000_000)
        )

        # Training cost (one-time)
        training_cost = training_tokens * self.costs["training"] / 1_000_000

        # Monthly costs
        monthly_base = base_cost_per_request * monthly_requests
        monthly_ft = ft_cost_per_request * monthly_requests
        monthly_savings = monthly_base - monthly_ft

        # Break-even point
        if monthly_savings > 0:
            breakeven_months = training_cost / monthly_savings
        else:
            breakeven_months = float('inf')

        return {
            "base_cost_per_request": base_cost_per_request,
            "ft_cost_per_request": ft_cost_per_request,
            "monthly_base_cost": monthly_base,
            "monthly_ft_cost": monthly_ft,
            "monthly_savings": monthly_savings,
            "training_cost": training_cost,
            "breakeven_months": breakeven_months,
            "recommendation": "Fine-tune" if breakeven_months < 3 else "Use prompting"
        }

    def analyze_use_case(self, use_case: dict) -> dict:
        """Analyze specific use case for fine-tuning viability"""

        analysis = self.calculate_breakeven(**use_case)

        print(f"\n🔍 Fine-tuning Analysis: {use_case.get('name', 'Use Case')}")
        print("=" * 50)
        print(f"Current cost per request: ${analysis['base_cost_per_request']:.4f}")
        print(f"Fine-tuned cost per request: ${analysis['ft_cost_per_request']:.4f}")
        print(f"Monthly savings: ${analysis['monthly_savings']:.2f}")
        print(f"Training cost: ${analysis['training_cost']:.2f}")
        print(f"Break-even: {analysis['breakeven_months']:.1f} months")
        print(f"Recommendation: {analysis['recommendation']}")

        return analysis

# Analyze different scenarios
analyzer = FineTuningAnalyzer()

use_cases = [
    {
        "name": "Customer Support Classifier",
        "base_model": "gpt-4o",
        "prompt_tokens": 2000,  # Long prompt with examples
        "completion_tokens": 50,  # Short classification
        "training_tokens": 100_000,  # Training dataset
        "monthly_requests": 50_000
    },
    {
        "name": "Code Review Assistant",
        "base_model": "gpt-4o",
        "prompt_tokens": 5000,  # Coding standards + context
        "completion_tokens": 500,  # Detailed review
        "training_tokens": 500_000,
        "monthly_requests": 10_000
    },
    {
        "name": "Simple Data Extraction",
        "base_model": "gpt-4o-mini",
        "prompt_tokens": 500,
        "completion_tokens": 100,
        "training_tokens": 50_000,
        "monthly_requests": 100_000
    }
]

for use_case in use_cases:
    analyzer.analyze_use_case(use_case)

10. Complete Production Setup

Here's everything combined into a production-ready optimization system:

import litellm
from litellm import Cache
import asyncio
from typing import List, Dict, Optional
import redis
from enum import Enum
from dataclasses import dataclass
import time

@dataclass
class OptimizationConfig:
    """Configuration for LLM optimizations"""
    enable_caching: bool = True
    enable_semantic_cache: bool = True
    enable_prompt_caching: bool = True
    enable_batching: bool = True
    enable_streaming: bool = False
    batch_size: int = 10
    cache_ttl: int = 3600
    semantic_threshold: float = 0.93

class OptimizedLLMSystem:
    def __init__(self, config: OptimizationConfig = OptimizationConfig()):
        self.config = config

        # Initialize caching
        if config.enable_caching:
            litellm.cache = Cache(type="redis")

        # Initialize components
        self.router = ModelRouter()
        self.batcher = SmartBatcher() if config.enable_batching else None
        self.semantic_cache = SemanticCache() if config.enable_semantic_cache else None

        # Metrics
        self.metrics = {
            "total_requests": 0,
            "cache_hits": 0,
            "total_cost": 0,
            "total_tokens": 0,
            "model_usage": {}
        }

    async def complete(self,
                      messages: List[Dict],
                      user_id: Optional[str] = None,
                      **kwargs) -> Dict:
        """
        Optimized completion with all techniques
        """

        self.metrics["total_requests"] += 1

        # 1. Check semantic cache
        if self.config.enable_semantic_cache:
            cached = await self.semantic_cache.get(messages, "auto")
            if cached:
                self.metrics["cache_hits"] += 1
                print("✨ Semantic cache hit!")
                return cached

        # 2. Determine model
        model = self.router.route(messages)

        # 3. Apply prompt caching if applicable
        if self.config.enable_prompt_caching and self._should_use_prompt_cache(messages):
            messages = self._prepare_prompt_cache(messages)

        # 4. Batch or stream based on config
        if self.config.enable_streaming:
            response = await self._stream_completion(model, messages, **kwargs)
        elif self.config.enable_batching and self.batcher:
            response = await self.batcher.add_request(
                request_id=f"{user_id}-{time.time()}",
                messages=messages,
                priority=self._get_priority(user_id)
            )
        else:
            response = await litellm.acompletion(
                model=model,
                messages=messages,
                caching=self.config.enable_caching,
                **kwargs
            )

        # 5. Update metrics
        self._update_metrics(response, model)

        # 6. Store in semantic cache
        if self.config.enable_semantic_cache:
            await self.semantic_cache.set(messages, model, response.model_dump())

        return response

    def _should_use_prompt_cache(self, messages: List[Dict]) -> bool:
        """Determine if prompt caching should be used"""
        # Check if system message is long enough
        if messages and messages[0].get("role") == "system":
            content = messages[0].get("content", "")
            return len(content) > 1000
        return False

    def _prepare_prompt_cache(self, messages: List[Dict]) -> List[Dict]:
        """Prepare messages for prompt caching"""
        if messages and messages[0].get("role") == "system":
            # Add cache control for Anthropic
            if isinstance(messages[0]["content"], str):
                messages[0]["content"] = [
                    {
                        "type": "text",
                        "text": messages[0]["content"],
                        "cache_control": {"type": "ephemeral"}
                    }
                ]
        return messages

    async def _stream_completion(self, model: str, messages: List[Dict], **kwargs):
        """Stream completion"""
        response_chunks = []

        stream = await litellm.acompletion(
            model=model,
            messages=messages,
            stream=True,
            **kwargs
        )

        async for chunk in stream:
            response_chunks.append(chunk)

        # Combine chunks into response
        return self._combine_stream_chunks(response_chunks)

    def _get_priority(self, user_id: Optional[str]) -> int:
        """Get request priority based on user"""
        if not user_id:
            return 0
        if "premium" in user_id:
            return 10
        if "enterprise" in user_id:
            return 20
        return 0

    def _update_metrics(self, response, model: str):
        """Update system metrics"""
        cost = litellm.completion_cost(completion_response=response)
        tokens = response.usage.total_tokens

        self.metrics["total_cost"] += cost
        self.metrics["total_tokens"] += tokens
        self.metrics["model_usage"][model] = self.metrics["model_usage"].get(model, 0) + 1

    def get_metrics_report(self) -> Dict:
        """Generate metrics report"""
        cache_rate = (self.metrics["cache_hits"] / max(self.metrics["total_requests"], 1)) * 100
        avg_cost = self.metrics["total_cost"] / max(self.metrics["total_requests"], 1)

        return {
            "total_requests": self.metrics["total_requests"],
            "cache_hit_rate": f"{cache_rate:.1f}%",
            "total_cost": f"${self.metrics['total_cost']:.2f}",
            "average_cost_per_request": f"${avg_cost:.4f}",
            "total_tokens": self.metrics["total_tokens"],
            "model_distribution": self.metrics["model_usage"]
        }

# Usage Example
async def production_demo():
    """Demonstrate production optimization system"""

    # Configure optimizations
    config = OptimizationConfig(
        enable_caching=True,
        enable_semantic_cache=True,
        enable_prompt_caching=True,
        enable_batching=True,
        enable_streaming=False,
        batch_size=5
    )

    system = OptimizedLLMSystem(config)

    # Simulate various requests
    test_requests = [
        {
            "user_id": "premium-user-1",
            "messages": [
                {"role": "system", "content": "You are a helpful assistant. " * 100},
                {"role": "user", "content": "What is the capital of France?"}
            ]
        },
        {
            "user_id": "user-2",
            "messages": [
                {"role": "system", "content": "You are a helpful assistant. " * 100},
                {"role": "user", "content": "What's the capital city of France?"}  # Similar
            ]
        },
        {
            "user_id": "enterprise-3",
            "messages": [
                {"role": "user", "content": "Write a complex algorithm for sorting"}
            ]
        }
    ]

    # Process requests
    for req in test_requests:
        response = await system.complete(
            messages=req["messages"],
            user_id=req["user_id"]
        )
        print(f"Processed request for {req['user_id']}")

    # Generate report
    print("\n📊 Optimization Report")
    print("=" * 50)
    report = system.get_metrics_report()
    for key, value in report.items():
        print(f"{key}: {value}")

# Run the demo
asyncio.run(production_demo())

Results & Impact

After implementing all optimizations:

TechniqueImpactWhen to Use
Prompt Caching90% cost reduction on contextLong, repeated system prompts
Batch Processing5-10x throughput, 50% cost (offline)Bulk operations, async workflows
Semantic Caching30-40% more cache hitsFAQ, support queries
Streaming80% faster perceived responseUser-facing applications
Fine-tuning60-80% cost reductionRepetitive, specialized tasks

Combined Impact

Before optimizations:

  • Cost per 1000 requests: $350
  • Average latency: 2.5s
  • Cache hit rate: 0%
  • Throughput: 100 requests/minute

After optimizations:

  • Cost per 1000 requests: $28 (92% reduction)
  • Average latency: 0.4s (84% faster)
  • Cache hit rate: 65%
  • Throughput: 1000 requests/minute (10x)

Decision Framework

def optimization_decision_tree(use_case: dict) -> list:
    """Recommend optimizations based on use case"""

    recommendations = []

    # Always start with observability
    recommendations.append("1. Enable Langfuse observability")

    # High volume → Batching
    if use_case["daily_requests"] > 10000:
        recommendations.append("2. Implement batch processing")

    # Repetitive queries → Caching
    if use_case["query_similarity"] > 0.5:
        recommendations.append("3. Add response caching")
        if use_case["query_similarity"] > 0.7:
            recommendations.append("4. Add semantic caching")

    # Long contexts → Prompt caching
    if use_case["avg_prompt_tokens"] > 1000:
        recommendations.append("5. Enable prompt caching")

    # User-facing → Streaming
    if use_case["user_facing"]:
        recommendations.append("6. Implement streaming")

    # Specialized task → Fine-tuning
    if use_case["task_specificity"] > 0.8 and use_case["monthly_requests"] > 50000:
        recommendations.append("7. Consider fine-tuning")

    return recommendations

# Example analysis
use_case = {
    "daily_requests": 50000,
    "query_similarity": 0.8,
    "avg_prompt_tokens": 2000,
    "user_facing": True,
    "task_specificity": 0.9,
    "monthly_requests": 1500000
}

recommendations = optimization_decision_tree(use_case)
print("Optimization Roadmap:")
for rec in recommendations:
    print(rec)

Key Takeaways

  1. Stack optimizations - Each technique compounds the others
  2. Measure everything - You can't optimize what you don't track
  3. Start simple - Basic caching gives the biggest initial win
  4. Think in systems - Combine techniques for maximum impact
  5. Profile your workload - Different patterns need different optimizations

What's Next?

You now have a complete toolkit for optimizing LLM costs and performance. Start with:

Week 1: Implement Part 1 basics (observability, caching, model selection) Week 2: Add prompt caching and batch processing Week 3: Test semantic caching for your use case Month 2: Evaluate fine-tuning opportunities

Remember: The best optimization is the one that ships. Start with the basics, measure the impact, then layer on advanced techniques.

Resources

Have you implemented these optimizations? What worked best for your use case? Share your results in the comments!