В этом проекте мы создадим веб-скрапер - приложение для извлечения данных с веб-сайтов. Этот проект поможет закрепить знания по работе с сетевыми запросами, обработке HTML-документов, регулярным выражениям и сохранению данных. Мы будем использовать библиотеки requests для выполнения HTTP-запросов и BeautifulSoup для парсинга HTML-контента.
Перед началом реализации определим основные компоненты веб-скрапера:
Перед началом работы убедитесь, что у вас установлены необходимые библиотеки:
pip install requests beautifulsoup4 lxml
Пояснение к библиотекам:
Создадим класс WebScraper, который будет содержать всю логику работы скрапера:
import requests
from bs4 import BeautifulSoup
import json
import csv
import time
from urllib.parse import urljoin, urlparse
import os
class WebScraper:
def __init__(self):
# Заголовки для имитации браузера
self.заголовки = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'ru-RU,ru;q=0.8,en-US;q=0.5,en;q=0.3',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
}
self.сессия = requests.Session()
self.сессия.headers.update(self.заголовки)
def получить_страницу(self, url):
"""Получает HTML-контент страницы по указанному URL"""
try:
ответ = self.сессия.get(url, timeout=10)
ответ.raise_for_status() # Проверка на ошибки HTTP
ответ.encoding = 'utf-8' # Установка правильной кодировки
return ответ.text
except requests.exceptions.RequestException as e:
print(f"Ошибка при получении страницы {url}: {e}")
return None
def парсить_страницу(self, html):
"""Парсит HTML-контент и возвращает объект BeautifulSoup"""
if html:
return BeautifulSoup(html, 'lxml')
return None
def извлечь_заголовки(self, soup):
"""Извлекает все заголовки (h1-h6) со страницы"""
if not soup:
return []
заголовки = []
for i in range(1, 7):
for элемент in soup.find_all(f'h{i}'):
if элемент.text.strip():
заголовки.append({
'уровень': f'h{i}',
'текст': элемент.text.strip()
})
return заголовки
def извлечь_ссылки(self, soup, базовый_url=''):
"""Извлекает все ссылки со страницы"""
if not soup:
return []
ссылки = []
for элемент in soup.find_all('a', href=True):
href = элемент['href']
текст = элемент.text.strip()
# Преобразование относительных ссылок в абсолютные
if базовый_url:
href = urljoin(базовый_url, href)
if href and текст:
ссылки.append({
'текст': текст,
'url': href
})
return ссылки
def извлечь_изображения(self, soup, базовый_url=''):
"""Извлекает все изображения со страницы"""
if not soup:
return []
изображения = []
for элемент in soup.find_all('img', src=True):
src = элемент['src']
alt = элемент.get('alt', '')
# Преобразование относительных путей в абсолютные
if базовый_url:
src = urljoin(базовый_url, src)
if src:
изображения.append({
'src': src,
'alt': alt
})
return изображения
def извлечь_текст(self, soup):
"""Извлекает основной текст со страницы"""
if not soup:
return ""
# Удаление скриптов и стилей
for элемент in soup(['script', 'style']):
элемент.decompose()
# Извлечение текста
текст = soup.get_text()
# Очистка текста
строки = (строка.strip() for строка in текст.splitlines())
фрагменты = (фраза.strip() for строка in строки for фраза in строка.split(" "))
текст = ' '.join(фрагмент for фрагмент in фрагменты if фрагмент)
return текст
def сохранить_в_json(self, данные, имя_файла):
"""Сохраняет данные в JSON-файл"""
try:
with open(имя_файла, 'w', encoding='utf-8') as файл:
json.dump(данные, файл, ensure_ascii=False, indent=2)
print(f"Данные сохранены в {имя_файла}")
except Exception as e:
print(f"Ошибка при сохранении в JSON: {e}")
def сохранить_в_csv(self, данные, имя_файла, поля):
"""Сохраняет данные в CSV-файл"""
try:
with open(имя_файла, 'w', newline='', encoding='utf-8') as файл:
писатель = csv.DictWriter(файл, fieldnames=поля)
писатель.writeheader()
писатель.writerows(данные)
print(f"Данные сохранены в {имя_файла}")
except Exception as e:
print(f"Ошибка при сохранении в CSV: {e}")
def скрапить_сайт(self, url):
"""Основной метод для скрапинга сайта"""
print(f"Начинаем скрапинг сайта: {url}")
# Получение HTML-контента
html = self.получить_страницу(url)
if not html:
return
# Парсинг страницы
soup = self.парсить_страницу(html)
if not soup:
return
# Извлечение данных
данные = {
'url': url,
'заголовки': self.извлечь_заголовки(soup),
'ссылки': self.извлечь_ссылки(soup, url),
'изображения': self.извлечь_изображения(soup, url),
'текст': self.извлечь_текст(soup)[:1000] # Ограничиваем текст 1000 символами
}
return данные
# Пример использования
if __name__ == "__main__":
скрапер = WebScraper()
# Скрапинг примера сайта
url = "https://httpbin.org/html" # Тестовый сайт
данные = скрапер.скрапить_сайт(url)
if данные:
# Сохранение в JSON
скрапер.сохранить_в_json(данные, "результат_скрапинга.json")
# Сохранение ссылок в CSV
if данные['ссылки']:
скрапер.сохранить_в_csv(данные['ссылки'], "ссылки.csv", ['текст', 'url'])
# Вывод результатов
print("\n=== Результаты скрапинга ===")
print(f"URL: {данные['url']}")
print(f"Найдено заголовков: {len(данные['заголовки'])}")
print(f"Найдено ссылок: {len(данные['ссылки'])}")
print(f"Найдено изображений: {len(данные['изображения'])}")
print(f"Извлечено текста: {len(данные['текст'])} символов")
Добавим дополнительные функции и улучшим обработку ошибок:
import requests
from bs4 import BeautifulSoup
import json
import csv
import time
from urllib.parse import urljoin, urlparse
import os
import random
from datetime import datetime
class AdvancedWebScraper:
def __init__(self, задержка=1):
# Задержка между запросами (в секундах)
self.задержка = задержка
# Различные User-Agent для имитации разных браузеров
self.user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15'
]
self.сессия = requests.Session()
self.обновить_заголовки()
# Статистика скрапинга
self.статистика = {
'запросов': 0,
'успешных': 0,
'ошибок': 0,
'начало': datetime.now()
}
def обновить_заголовки(self):
"""Обновляет заголовки с случайным User-Agent"""
заголовки = {
'User-Agent': random.choice(self.user_agents),
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'ru-RU,ru;q=0.8,en-US;q=0.5,en;q=0.3',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
}
self.сессия.headers.update(заголовки)
def получить_страницу(self, url, попытки=3):
"""Получает HTML-контент страницы с возможностью повторных попыток"""
for попытка in range(попытки):
try:
# Случайная задержка перед запросом
time.sleep(random.uniform(0.5, self.задержка))
# Обновление заголовков для каждого запроса
self.обновить_заголовки()
ответ = self.сессия.get(url, timeout=15)
self.статистика['запросов'] += 1
if ответ.status_code == 200:
ответ.encoding = 'utf-8'
self.статистика['успешных'] += 1
return ответ.text
elif ответ.status_code == 429: # Слишком много запросов
print(f"Слишком много запросов. Ждем 5 секунд...")
time.sleep(5)
continue
else:
print(f"HTTP ошибка {ответ.status_code} для {url}")
except requests.exceptions.Timeout:
print(f"Таймаут при запросе {url} (попытка {попытка + 1})")
except requests.exceptions.ConnectionError:
print(f"Ошибка соединения при запросе {url} (попытка {попытка + 1})")
except requests.exceptions.RequestException as e:
print(f"Ошибка при запросе {url}: {e} (попытка {попытка + 1})")
self.статистика['ошибок'] += 1
# Пауза перед повторной попыткой
if попытка < 2:
time.sleep(2 ** попытка) # Экспоненциальная задержка
return None
def парсить_страницу(self, html):
"""Парсит HTML-контент и возвращает объект BeautifulSoup"""
if html:
try:
return BeautifulSoup(html, 'lxml')
except Exception as e:
print(f"Ошибка при парсинге HTML: {e}")
try:
return BeautifulSoup(html, 'html.parser')
except Exception as e2:
print(f"Ошибка при парсинге с html.parser: {e2}")
return None
def извлечь_мета_данные(self, soup):
"""Извлекает мета-данные страницы"""
if not soup:
return {}
мета = {}
# Заголовок страницы
заголовок = soup.find('title')
мета['заголовок'] = заголовок.text.strip() if заголовок else ""
# Мета-теги
for мета_тег in soup.find_all('meta'):
if мета_тег.get('name'):
имя = мета_тег['name']
значение = мета_тег.get('content', '')
мета[имя] = значение
elif мета_тег.get('property'):
свойство = мета_тег['property']
значение = мета_тег.get('content', '')
мета[свойство] = значение
return мета
def извлечь_заголовки(self, soup):
"""Извлекает все заголовки (h1-h6) со страницы"""
if not soup:
return []
заголовки = []
for i in range(1, 7):
for элемент in soup.find_all(f'h{i}'):
if элемент.text.strip():
заголовки.append({
'уровень': f'h{i}',
'текст': элемент.text.strip(),
'id': элемент.get('id', '')
})
return заголовки
def извлечь_ссылки(self, soup, базовый_url=''):
"""Извлекает все ссылки со страницы"""
if not soup:
return []
ссылки = []
for элемент in soup.find_all('a', href=True):
href = элемент['href']
текст = элемент.text.strip()
# Преобразование относительных ссылок в абсолютные
if базовый_url:
href = urljoin(базовый_url, href)
if href and текст:
ссылки.append({
'текст': текст,
'url': href,
'title': элемент.get('title', '')
})
return ссылки
def извлечь_изображения(self, soup, базовый_url=''):
"""Извлекает все изображения со страницы"""
if not soup:
return []
изображения = []
for элемент in soup.find_all('img', src=True):
src = элемент['src']
alt = элемент.get('alt', '')
title = элемент.get('title', '')
# Преобразование относительных путей в абсолютные
if базовый_url:
src = urljoin(базовый_url, src)
if src:
изображения.append({
'src': src,
'alt': alt,
'title': title
})
return изображения
def извлечь_таблицы(self, soup):
"""Извлекает все таблицы со страницы"""
if not soup:
return []
таблицы = []
for таблица in soup.find_all('table'):
данные_таблицы = []
for строка in таблица.find_all('tr'):
ячейки = []
for ячейка in строка.find_all(['td', 'th']):
ячейки.append(ячейка.get_text(strip=True))
if ячейки:
данные_таблицы.append(ячейки)
if данные_таблицы:
таблицы.append(данные_таблицы)
return таблицы
def извлечь_текст(self, soup):
"""Извлекает основной текст со страницы"""
if not soup:
return ""
# Удаление скриптов и стилей
for элемент in soup(['script', 'style', 'nav', 'footer', 'header']):
элемент.decompose()
# Извлечение текста из основных тегов
текст = ""
for элемент in soup.find_all(['p', 'div', 'span', 'article', 'section']):
текст_элемента = элемент.get_text(strip=True)
if текст_элемента and len(текст_элемента) > 10: # Игнорируем короткие фрагменты
текст += текст_элемента + "\n\n"
if not текст:
# Если не нашли текст в основных тегах, извлекаем весь текст
текст = soup.get_text()
# Очистка текста
строки = (строка.strip() for строка in текст.splitlines())
фрагменты = (фраза.strip() for строка in строки for фраза in строка.split(" "))
текст = "\n".join(фрагмент for фрагмент in фрагменты if фрагмент)
return текст
def сохранить_в_json(self, данные, имя_файла):
"""Сохраняет данные в JSON-файл"""
try:
with open(имя_файла, 'w', encoding='utf-8') as файл:
json.dump(данные, файл, ensure_ascii=False, indent=2, default=str)
print(f"Данные сохранены в {имя_файла}")
except Exception as e:
print(f"Ошибка при сохранении в JSON: {e}")
def сохранить_в_csv(self, данные, имя_файла, поля):
"""Сохраняет данные в CSV-файл"""
if not данные:
print("Нет данных для сохранения в CSV")
return
try:
with open(имя_файла, 'w', newline='', encoding='utf-8') as файл:
писатель = csv.DictWriter(файл, fieldnames=поля)
писатель.writeheader()
for запись in данные:
try:
писатель.writerow(запись)
except Exception as e:
print(f"Ошибка при записи строки в CSV: {e}")
continue
print(f"Данные сохранены в {имя_файла}")
except Exception as e:
print(f"Ошибка при сохранении в CSV: {e}")
def скрапить_сайт(self, url):
"""Основной метод для скрапинга сайта"""
print(f"Начинаем скрапинг сайта: {url}")
# Получение HTML-контента
html = self.получить_страницу(url)
if not html:
print(f"Не удалось получить страницу: {url}")
return None
# Парсинг страницы
soup = self.парсить_страницу(html)
if not soup:
print(f"Не удалось распарсить страницу: {url}")
return None
# Извлечение данных
данные = {
'url': url,
'дата_скрапинга': datetime.now().isoformat(),
'мета_данные': self.извлечь_мета_данные(soup),
'заголовки': self.извлечь_заголовки(soup),
'ссылки': self.извлечь_ссылки(soup, url),
'изображения': self.извлечь_изображения(soup, url),
'таблицы': self.извлечь_таблицы(soup),
'текст': self.извлечь_текст(soup)[:5000] # Ограничиваем текст 5000 символами
}
return данные
def скрапить_несколько_страниц(self, urls):
"""Скрапит несколько страниц и объединяет результаты"""
все_данные = []
for i, url in enumerate(urls, 1):
print(f"\nОбработка страницы {i} из {len(urls)}: {url}")
данные = self.скрапить_сайт(url)
if данные:
все_данные.append(данные)
return все_данные
def показать_статистику(self):
"""Показывает статистику скрапинга"""
время_работы = datetime.now() - self.статистика['начало']
print("\n=== Статистика скрапинга ===")
print(f"Всего запросов: {self.статистика['запросов']}")
print(f"Успешных: {self.статистика['успешных']}")
print(f"Ошибок: {self.статистика['ошибок']}")
print(f"Время работы: {время_работы}")
if self.статистика['запросов'] > 0:
процент_успеха = (self.статистика['успешных'] / self.статистика['запросов']) * 100
print(f"Процент успеха: {процент_успеха:.1f}%")
# Пример использования расширенного скрапера
if __name__ == "__main__":
скрапер = AdvancedWebScraper(задержка=2)
# Скрапинг нескольких тестовых сайтов
urls = [
"https://httpbin.org/html",
"https://httpbin.org/links/10"
]
# Скрапинг всех страниц
все_данные = скрапер.скрапить_несколько_страниц(urls)
# Сохранение результатов
if все_данные:
скрапер.сохранить_в_json(все_данные, "все_результаты_скрапинга.json")
# Сохранение всех ссылок в один CSV файл
все_ссылки = []
for данные in все_данные:
все_ссылки.extend(данные['ссылки'])
if все_ссылки:
скрапер.сохранить_в_csv(все_ссылки, "все_ссылки.csv", ['текст', 'url', 'title'])
# Вывод сводной информации
print("\n=== Сводная информация ===")
for i, данные in enumerate(все_данные, 1):
print(f"\nСтраница {i}: {данные['url']}")
print(f" Заголовков: {len(данные['заголовки'])}")
print(f" Ссылок: {len(данные['ссылки'])}")
print(f" Изображений: {len(данные['изображения'])}")
print(f" Таблиц: {len(данные['таблицы'])}")
print(f" Текста: {len(данные['текст'])} символов")
# Показ статистики
скрапер.показать_статистику()
Рассмотрим практические примеры использования веб-скрапера для реальных задач:
# Пример 1: Скрапинг новостного сайта
def скрапить_новости(url):
скрапер = AdvancedWebScraper()
данные = скрапер.скрапить_сайт(url)
if данные:
# Извлечение только заголовков новостей
новости = []
for заголовок in данные['заголовки']:
if заголовок['уровень'] in ['h1', 'h2', 'h3']:
новости.append(заголовок['текст'])
return новости
return []
# Пример 2: Скрапинг курсов валют
def скрапить_курсы_валют():
скрапер = AdvancedWebScraper()
# Используем тестовый API для демонстрации
url = "https://httpbin.org/json"
данные = скрапер.скрапить_сайт(url)
return данные
# Пример 3: Скрапинг изображений с сохранением
def скачать_изображения(url, папка="изображения"):
скрапер = AdvancedWebScraper()
данные = скрапер.скрапить_сайт(url)
if данные and данные['изображения']:
# Создание папки для изображений
if not os.path.exists(папка):
os.makedirs(папка)
for i, изображение in enumerate(данные['изображения']):
try:
# Скачивание изображения
ответ = скрапер.сессия.get(изображение['src'], timeout=10)
if ответ.status_code == 200:
# Определение расширения файла
расширение = '.jpg'
if '.png' in изображение['src']:
расширение = '.png'
elif '.gif' in изображение['src']:
расширение = '.gif'
# Сохранение файла
имя_файла = os.path.join(папка, f"изображение_{i+1}{расширение}")
with open(имя_файла, 'wb') as файл:
файл.write(ответ.content)
print(f"Сохранено: {имя_файла}")
except Exception as e:
print(f"Ошибка при скачивании изображения {изображение['src']}: {e}")
# Пример 4: Поиск email-адресов на странице
import re
def найти_email(url):
скрапер = AdvancedWebScraper()
данные = скрапер.скрапить_сайт(url)
if данные:
# Поиск email-адресов в тексте
email_паттерн = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
найденные_email = re.findall(email_паттерн, данные['текст'])
return list(set(найденные_email)) # Удаление дубликатов
return []
# Пример использования
if __name__ == "__main__":
# Пример скрапинга новостей
print("=== Скрапинг новостей ===")
новости = скрапить_новости("https://httpbin.org/html")
for новость in новости:
print(f"- {новость}")
# Пример поиска email
print("\n=== Поиск email ===")
email_адреса = найти_email("https://httpbin.org/html")
if email_адреса:
for email in email_адреса:
print(f"- {email}")
else:
print("Email-адреса не найдены")
Попробуйте выполнить следующие упражнения, чтобы закрепить полученные знания:
Создайте скрапер, который извлекает заголовки новостей, даты публикации и краткое описание с новостного сайта. Сохраните данные в CSV-файл с колонками: заголовок, дата, описание.
Разработайте скрапер для извлечения текущих курсов валют с финансового сайта. Реализуйте функцию, которая сравнивает курсы с предыдущими значениями и определяет, какие валюты подорожали, а какие подешевели.
Улучшите функцию скачивания изображений, добавив фильтрацию по размеру (только изображения больше 100KB) и формату (только JPG и PNG). Добавьте прогресс-бар для отображения процесса скачивания.
Создайте скрапер, который может авторизоваться на сайте с помощью логина и пароля, а затем извлекать данные с защищенных страниц. Используйте сессии requests для сохранения cookies.
Реализуйте многопоточную версию скрапера, которая может одновременно обрабатывать несколько URL. Используйте модуль concurrent.futures для управления потоками.
Вот примеры решений для некоторых упражнений:
# Решение упражнения 1: Скрапер новостей
class NewsScraper(AdvancedWebScraper):
def извлечь_новости(self, url):
данные = self.скрапить_сайт(url)
if not данные:
return []
новости = []
for заголовок in данные['заголовки']:
if заголовок['уровень'] in ['h1', 'h2', 'h3']:
новости.append({
'заголовок': заголовок['текст'],
'дата': datetime.now().strftime('%Y-%m-%d'),
'описание': 'Краткое описание новости...' # В реальном скрапере извлекать из текста
})
return новости
def сохранить_новости(self, новости, имя_файла):
self.сохранить_в_csv(новости, имя_файла, ['заголовок', 'дата', 'описание'])
# Решение упражнения 3: Скрапер изображений с фильтрацией
def скачать_изображения_с_фильтрацией(url, папка="изображения", мин_размер=100*1024):
скрапер = AdvancedWebScraper()
данные = скрапер.скрапить_сайт(url)
if not данные or not данные['изображения']:
print("Изображения не найдены")
return
# Создание папки для изображений
if not os.path.exists(папка):
os.makedirs(папка)
скачанные = 0
всего = len(данные['изображения'])
print(f"Найдено изображений: {всего}")
for i, изображение in enumerate(данные['изображения']):
try:
# Проверка формата
if not any(формат in изображение['src'].lower() for формат in ['.jpg', '.jpeg', '.png']):
continue
# Скачивание изображения
ответ = скрапер.сессия.get(изображение['src'], timeout=10)
if ответ.status_code == 200:
# Проверка размера
if len(ответ.content) < 100*1024: # Меньше 100KB
continue
# Определение расширения файла
if '.png' in изображение['src'].lower():
расширение = '.png'
else:
расширение = '.jpg'
# Сохранение файла
имя_файла = os.path.join(папка, f"изображение_{i+1}{расширение}")
with open(имя_файла, 'wb') as файл:
файл.write(ответ.content)
скачанные += 1
print(f"[{скачанные}/{всего}] Сохранено: {имя_файла} ({len(ответ.content)} bytes)")
except Exception as e:
print(f"Ошибка при скачивании изображения {изображение['src']}: {e}")
continue
print(f"\nВсего скачано изображений: {скачанные}")
# Решение упражнения 5: Многопоточный скрапер
from concurrent.futures import ThreadPoolExecutor, as_completed
class MultiThreadedScraper:
def __init__(self, max_workers=5):
self.max_workers = max_workers
def скрапить_url(self, url):
скрапер = AdvancedWebScraper()
return скрапер.скрапить_сайт(url)
def скрапить_многопоточно(self, urls):
результаты = []
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# Запуск задач
будущие_задачи = {executor.submit(self.скрапить_url, url): url for url in urls}
# Обработка результатов по мере завершения
for будущее in as_completed(будущие_задачи):
url = будущие_задачи[будущее]
try:
данные = будущее.result()
if данные:
результаты.append(данные)
print(f"Успешно обработан: {url}")
else:
print(f"Ошибка при обработке: {url}")
except Exception as e:
print(f"Исключение при обработке {url}: {e}")
return результаты
# Пример использования многопоточного скрапера
if __name__ == "__main__":
urls = [
"https://httpbin.org/html",
"https://httpbin.org/json",
"https://httpbin.org/xml"
]
многопоточный_скрапер = MultiThreadedScraper(max_workers=3)
результаты = многопоточный_скрапер.скрапить_многопоточно(urls)
print(f"\nОбработано URL: {len(результаты)}")
При работе с веб-скрапингом важно соблюдать этические нормы и технические ограничения: