Caching Strategies Cho Web Scraping: Tiết Kiệm Bandwidth

Trở lại Tin tức
Tin tức

Caching Strategies Cho Web Scraping: Tiết Kiệm Bandwidth

Caching giúp scrape nhanh hơn và tiết kiệm resources. Bài viết hướng dẫn caching strategies hiệu quả.

Tại Sao Cần Caching?

  • Tiết kiệm bandwidth: Không fetch lại data đã có
  • Faster development: Test parsing mà không cần request
  • Respect servers: Giảm load lên target
  • Retry-friendly: Resume failed scrapes

1. File-based Cache

import os
import hashlib
import requests

CACHE_DIR = 'cache'
os.makedirs(CACHE_DIR, exist_ok=True)

def get_cache_path(url):
    url_hash = hashlib.md5(url.encode()).hexdigest()
    return os.path.join(CACHE_DIR, f"{url_hash}.html")

def fetch_with_cache(url, max_age_hours=24):
    cache_path = get_cache_path(url)
    
    # Check cache
    if os.path.exists(cache_path):
        age = time.time() - os.path.getmtime(cache_path)
        if age < max_age_hours * 3600:
            with open(cache_path, 'r') as f:
                return f.read()
    
    # Fetch and cache
    response = requests.get(url)
    with open(cache_path, 'w') as f:
        f.write(response.text)
    
    return response.text

2. SQLite Cache

import sqlite3
from datetime import datetime, timedelta

class SQLiteCache:
    def __init__(self, db_path='cache.db'):
        self.conn = sqlite3.connect(db_path)
        self.conn.execute('''
            CREATE TABLE IF NOT EXISTS cache (
                url TEXT PRIMARY KEY,
                content TEXT,
                fetched_at TIMESTAMP
            )
        ''')
    
    def get(self, url, max_age_hours=24):
        cursor = self.conn.execute(
            'SELECT content, fetched_at FROM cache WHERE url = ?',
            (url,)
        )
        row = cursor.fetchone()
        
        if row:
            content, fetched_at = row
            fetched = datetime.fromisoformat(fetched_at)
            if datetime.now() - fetched < timedelta(hours=max_age_hours):
                return content
        return None
    
    def set(self, url, content):
        self.conn.execute(
            'INSERT OR REPLACE INTO cache (url, content, fetched_at) VALUES (?, ?, ?)',
            (url, content, datetime.now().isoformat())
        )
        self.conn.commit()

# Usage
cache = SQLiteCache()
content = cache.get(url) or requests.get(url).text
cache.set(url, content)

3. Redis Cache

import redis
import json

class RedisCache:
    def __init__(self, host='localhost', port=6379):
        self.r = redis.Redis(host=host, port=port)
    
    def get(self, url):
        data = self.r.get(url)
        return data.decode() if data else None
    
    def set(self, url, content, ttl_seconds=86400):
        self.r.setex(url, ttl_seconds, content)

# Usage
cache = RedisCache()
content = cache.get(url)
if not content:
    content = requests.get(url).text
    cache.set(url, content, ttl_seconds=3600)

4. Requests-cache Library

import requests_cache

# Install: pip install requests-cache

# Enable caching for all requests
requests_cache.install_cache(
    'scraper_cache',
    backend='sqlite',
    expire_after=3600  # 1 hour
)

# Now all requests are automatically cached
response = requests.get('https://example.com')  # Fetches
response = requests.get('https://example.com')  # From cache!

# Check if from cache
print(f"From cache: {response.from_cache}")

5. Conditional Requests

def fetch_if_modified(url, last_modified=None, etag=None):
    headers = {}
    if last_modified:
        headers['If-Modified-Since'] = last_modified
    if etag:
        headers['If-None-Match'] = etag
    
    response = requests.get(url, headers=headers)
    
    if response.status_code == 304:
        print("Not modified, use cached version")
        return None
    
    # Save for next time
    new_last_modified = response.headers.get('Last-Modified')
    new_etag = response.headers.get('ETag')
    
    return response.text, new_last_modified, new_etag

6. Incremental Scraping

def scrape_new_only(urls, scraped_file='scraped.txt'):
    # Load already scraped
    scraped = set()
    if os.path.exists(scraped_file):
        with open(scraped_file, 'r') as f:
            scraped = set(f.read().splitlines())
    
    # Scrape only new URLs
    new_urls = [u for u in urls if u not in scraped]
    print(f"Scraping {len(new_urls)} new URLs (skipping {len(scraped)})")
    
    for url in new_urls:
        data = scrape(url)
        save(data)
        
        # Mark as scraped
        with open(scraped_file, 'a') as f:
            f.write(url + '\n')

Cache Invalidation

# Clear old cache entries
def cleanup_cache(cache_dir, max_age_days=7):
    cutoff = time.time() - (max_age_days * 86400)
    
    for filename in os.listdir(cache_dir):
        filepath = os.path.join(cache_dir, filename)
        if os.path.getmtime(filepath) < cutoff:
            os.remove(filepath)
            print(f"Removed: {filename}")

VinaProxy + Caching

  • Giảm bandwidth usage = giảm chi phí
  • Faster scraping với cache hits
  • Giá chỉ $0.5/GB

Dùng Thử Ngay →