Data Pipeline Cho Scraping: ETL Tự Động Hóa

Trở lại Tin tức
Tin tức

Data Pipeline Cho Scraping: ETL Tự Động Hóa

Scraping quy mô lớn cần data pipeline. Bài viết hướng dẫn xây dựng ETL pipeline cho scraped data.

ETL Là Gì?

  • Extract: Thu thập data từ sources
  • Transform: Clean, normalize, enrich
  • Load: Lưu vào database/warehouse

Basic Pipeline Structure

class ScrapingPipeline:
    def __init__(self):
        self.extractors = []
        self.transformers = []
        self.loaders = []
    
    def add_extractor(self, extractor):
        self.extractors.append(extractor)
    
    def add_transformer(self, transformer):
        self.transformers.append(transformer)
    
    def add_loader(self, loader):
        self.loaders.append(loader)
    
    def run(self, urls):
        # Extract
        raw_data = []
        for extractor in self.extractors:
            for url in urls:
                data = extractor.extract(url)
                raw_data.append(data)
        
        # Transform
        for transformer in self.transformers:
            raw_data = [transformer.transform(d) for d in raw_data]
        
        # Load
        for loader in self.loaders:
            loader.load(raw_data)
        
        return raw_data

Extractors

class ProductExtractor:
    def extract(self, url):
        response = requests.get(url)
        soup = BeautifulSoup(response.text, 'lxml')
        
        return {
            'url': url,
            'name': soup.select_one('.name').text,
            'price': soup.select_one('.price').text,
            'raw_html': response.text,
            'extracted_at': datetime.now().isoformat()
        }

class APIExtractor:
    def extract(self, endpoint):
        response = requests.get(endpoint)
        return response.json()

Transformers

class PriceTransformer:
    def transform(self, data):
        price_str = data.get('price', '')
        # "1,234,000đ" -> 1234000
        price = int(re.sub(r'[^\d]', '', price_str))
        data['price'] = price
        return data

class TextCleaner:
    def transform(self, data):
        for key in ['name', 'description']:
            if key in data:
                data[key] = data[key].strip()
                data[key] = ' '.join(data[key].split())
        return data

class DuplicateRemover:
    def __init__(self):
        self.seen = set()
    
    def transform(self, data):
        key = data.get('url')
        if key in self.seen:
            return None
        self.seen.add(key)
        return data

Loaders

class CSVLoader:
    def __init__(self, filename):
        self.filename = filename
    
    def load(self, data):
        df = pd.DataFrame(data)
        df.to_csv(self.filename, index=False)

class PostgresLoader:
    def __init__(self, connection_string):
        self.engine = create_engine(connection_string)
    
    def load(self, data):
        df = pd.DataFrame(data)
        df.to_sql('products', self.engine, if_exists='append', index=False)

class ElasticsearchLoader:
    def __init__(self, host):
        self.es = Elasticsearch([host])
    
    def load(self, data):
        for item in data:
            self.es.index(index='products', body=item)

Apache Airflow

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def extract_task():
    # Scraping logic
    return scraped_data

def transform_task(ti):
    data = ti.xcom_pull(task_ids='extract')
    # Transform logic
    return transformed_data

def load_task(ti):
    data = ti.xcom_pull(task_ids='transform')
    # Load to database

with DAG('scraping_pipeline', start_date=datetime(2026, 1, 1), schedule_interval='@daily') as dag:
    
    extract = PythonOperator(
        task_id='extract',
        python_callable=extract_task
    )
    
    transform = PythonOperator(
        task_id='transform',
        python_callable=transform_task
    )
    
    load = PythonOperator(
        task_id='load',
        python_callable=load_task
    )
    
    extract >> transform >> load

Luigi Pipeline

import luigi

class ScrapeTask(luigi.Task):
    url = luigi.Parameter()
    
    def output(self):
        return luigi.LocalTarget(f'data/{self.url.split("/")[-1]}.json')
    
    def run(self):
        data = scrape(self.url)
        with self.output().open('w') as f:
            json.dump(data, f)

class TransformTask(luigi.Task):
    def requires(self):
        return [ScrapeTask(url=url) for url in URLS]
    
    def output(self):
        return luigi.LocalTarget('data/transformed.json')
    
    def run(self):
        # Transform all scraped data
        pass

VinaProxy + Data Pipelines

  • Reliable extraction với proxy pool
  • Scale ETL jobs
  • Giá chỉ $0.5/GB

Dùng Thử Ngay →