Data Pipeline Cho Scraping: ETL Tự Động Hóa
Scraping quy mô lớn cần data pipeline. Bài viết hướng dẫn xây dựng ETL pipeline cho scraped data.
ETL Là Gì?
- Extract: Thu thập data từ sources
- Transform: Clean, normalize, enrich
- Load: Lưu vào database/warehouse
Basic Pipeline Structure
class ScrapingPipeline:
def __init__(self):
self.extractors = []
self.transformers = []
self.loaders = []
def add_extractor(self, extractor):
self.extractors.append(extractor)
def add_transformer(self, transformer):
self.transformers.append(transformer)
def add_loader(self, loader):
self.loaders.append(loader)
def run(self, urls):
# Extract
raw_data = []
for extractor in self.extractors:
for url in urls:
data = extractor.extract(url)
raw_data.append(data)
# Transform
for transformer in self.transformers:
raw_data = [transformer.transform(d) for d in raw_data]
# Load
for loader in self.loaders:
loader.load(raw_data)
return raw_data
Extractors
class ProductExtractor:
def extract(self, url):
response = requests.get(url)
soup = BeautifulSoup(response.text, 'lxml')
return {
'url': url,
'name': soup.select_one('.name').text,
'price': soup.select_one('.price').text,
'raw_html': response.text,
'extracted_at': datetime.now().isoformat()
}
class APIExtractor:
def extract(self, endpoint):
response = requests.get(endpoint)
return response.json()
Transformers
class PriceTransformer:
def transform(self, data):
price_str = data.get('price', '')
# "1,234,000đ" -> 1234000
price = int(re.sub(r'[^\d]', '', price_str))
data['price'] = price
return data
class TextCleaner:
def transform(self, data):
for key in ['name', 'description']:
if key in data:
data[key] = data[key].strip()
data[key] = ' '.join(data[key].split())
return data
class DuplicateRemover:
def __init__(self):
self.seen = set()
def transform(self, data):
key = data.get('url')
if key in self.seen:
return None
self.seen.add(key)
return data
Loaders
class CSVLoader:
def __init__(self, filename):
self.filename = filename
def load(self, data):
df = pd.DataFrame(data)
df.to_csv(self.filename, index=False)
class PostgresLoader:
def __init__(self, connection_string):
self.engine = create_engine(connection_string)
def load(self, data):
df = pd.DataFrame(data)
df.to_sql('products', self.engine, if_exists='append', index=False)
class ElasticsearchLoader:
def __init__(self, host):
self.es = Elasticsearch([host])
def load(self, data):
for item in data:
self.es.index(index='products', body=item)
Apache Airflow
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def extract_task():
# Scraping logic
return scraped_data
def transform_task(ti):
data = ti.xcom_pull(task_ids='extract')
# Transform logic
return transformed_data
def load_task(ti):
data = ti.xcom_pull(task_ids='transform')
# Load to database
with DAG('scraping_pipeline', start_date=datetime(2026, 1, 1), schedule_interval='@daily') as dag:
extract = PythonOperator(
task_id='extract',
python_callable=extract_task
)
transform = PythonOperator(
task_id='transform',
python_callable=transform_task
)
load = PythonOperator(
task_id='load',
python_callable=load_task
)
extract >> transform >> load
Luigi Pipeline
import luigi
class ScrapeTask(luigi.Task):
url = luigi.Parameter()
def output(self):
return luigi.LocalTarget(f'data/{self.url.split("/")[-1]}.json')
def run(self):
data = scrape(self.url)
with self.output().open('w') as f:
json.dump(data, f)
class TransformTask(luigi.Task):
def requires(self):
return [ScrapeTask(url=url) for url in URLS]
def output(self):
return luigi.LocalTarget('data/transformed.json')
def run(self):
# Transform all scraped data
pass
VinaProxy + Data Pipelines
- Reliable extraction với proxy pool
- Scale ETL jobs
- Giá chỉ $0.5/GB
