Setting the Stage: Context for the Curious Book Reader
This entry chronicles a critical refactoring journey within the Pipulate project, addressing the initial over-engineering of Selenium-based browser automation tools. It highlights the power of iterative, AI-guided development using precise patching mechanisms to transform complex, error-prone code into a robust, predictable, and maintainable system. Readers will gain insight into practical strategies for achieving cleaner code, implementing flexible API designs (like toggling headless browser mode), and establishing deterministic file organization for scraped web artifacts, all while navigating the challenges of technical debt and third-party warnings.
Technical Journal Entry Begins
Alright and this is now the new starting point. We are going to clean this thing up and make it as standalone as possible, getting rid of all these loading things from other places dependencies and make it clear and easy to look at.
The Problem: Over-Engineered Selenium
It’s like the AI chose the most difficult possible starting template for browser automation. Okay, what would I choose having had created the system and knowing there’s some cleaner approaches in the system?
I have the Selenium URL Opener plugin 820_selenium.py
Introducing the Cleaner Automation Frameworks
import asyncio
import json
import os
import shutil
import tempfile
from datetime import datetime
from pathlib import Path
from fasthtml.common import *
from loguru import logger
from imports.crud import Step # 🎯 STANDARDIZED: Import centralized Step definition
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from starlette.responses import HTMLResponse
from webdriver_manager.chrome import ChromeDriverManager
ROLES = ['Components']
'\nPipulate Selenium URL Opener Widget Workflow\nA workflow for demonstrating opening a URL in a Selenium-controlled Chrome browser.\n'
class SeleniumUrlOpenerWidget:
"""
Selenium URL Opener Widget Workflow
Demonstrates opening a URL using Selenium and ChromeDriver.
"""
APP_NAME = 'selenium_url_opener'
DISPLAY_NAME = 'Selenium URL Opener'
ENDPOINT_MESSAGE = 'This workflow demonstrates opening a URL in a new Chrome browser window controlled by Selenium.'
TRAINING_PROMPT = 'This workflow is for demonstrating and testing the Selenium URL opener. The user will input a URL, which will then be opened in a Selenium-controlled Chrome browser.'
def __init__(self, app, pipulate, pipeline, db, app_name=APP_NAME):
self.pipulate = pipulate
"""Initialize the workflow, define steps, and register routes."""
self.app = app
self.app_name = app_name
self.pipulate = pipulate
self.pipeline = pipeline
self.steps_indices = {}
pip = self.pipulate
pip = self.pipulate
self.message_queue = pip.message_queue
steps = [Step(id='step_01', done='selenium_url', show='URL for Selenium', refill=True, transform=lambda prev_value: prev_value.strip() if prev_value else '')]
routes = [(f'/{app_name}', self.landing), (f'/{app_name}/init', self.init, ['POST']), (f'/{app_name}/revert', self.handle_revert, ['POST']), (f'/{app_name}/finalize', self.finalize, ['GET', 'POST']), (f'/{app_name}/unfinalize', self.unfinalize, ['POST']), (f'/{app_name}/reopen_selenium_url', self.reopen_selenium_url, ['POST'])]
self.steps = steps
for step in steps:
step_id = step.id
routes.append((f'/{app_name}/{step_id}', getattr(self, step_id)))
routes.append((f'/{app_name}/{step_id}_submit', getattr(self, f'{step_id}_submit'), ['POST']))
for path, handler, *methods in routes:
method_list = methods[0] if methods else ['GET']
app.route(path, methods=method_list)(handler)
self.step_messages = {'finalize': {'ready': 'All steps complete. Ready to finalize workflow.', 'complete': f'Workflow finalized. Use {pip.UNLOCK_BUTTON_LABEL} to make changes.'}, 'step_01': {'input': 'Please enter the URL to open with Selenium.', 'complete': 'URL processed for Selenium.'}}
steps.append(Step(id='finalize', done='finalized', show='Finalize', refill=False))
self.steps_indices = {step.id: i for i, step in enumerate(steps)}
async def landing(self, request):
"""Generate the landing page using the standardized helper while maintaining WET explicitness."""
pip = self.pipulate
# Use centralized landing page helper - maintains WET principle by explicit call
return pip.create_standard_landing_page(self)
async def init(self, request):
pip, steps, app_name = (self.pipulate, self.steps, self.app_name)
form = await request.form()
user_input = form.get('pipeline_id', '').strip()
if not user_input:
from starlette.responses import Response
response = Response('')
response.headers['HX-Refresh'] = 'true'
return response
context = pip.get_plugin_context(self)
profile_name = context['profile_name'] or 'default'
plugin_name = app_name # Use app_name directly to ensure consistency
profile_part = profile_name.replace(' ', '_')
plugin_part = plugin_name.replace(' ', '_')
expected_prefix = f'{profile_part}-{plugin_part}-'
if user_input.startswith(expected_prefix):
pipeline_id = user_input
else:
_, temp_prefix, user_provided_id_part = pip.generate_pipeline_key(self, user_input)
pipeline_id = f'{expected_prefix}{user_provided_id_part}'
pip.db['pipeline_id'] = pipeline_id
state, error = pip.initialize_if_missing(pipeline_id, {'app_name': app_name})
if error:
return error
await self.message_queue.add(pip, f'Workflow ID: {pipeline_id}', verbatim=True, spaces_before=0)
await self.message_queue.add(pip, f"Return later by selecting '{pipeline_id}' from the dropdown.", verbatim=True, spaces_before=0)
return pip.run_all_cells(app_name, steps)
async def finalize(self, request):
pip, steps, app_name = (self.pipulate, self.steps, self.app_name)
pipeline_id = pip.db.get('pipeline_id', 'unknown')
finalize_step = steps[-1]
finalize_data = pip.get_step_data(pipeline_id, finalize_step.id, {})
if request.method == 'GET':
if finalize_step.done in finalize_data:
return Card(H3('Workflow is locked.'), Form(Button(pip.UNLOCK_BUTTON_LABEL, type='submit', cls='secondary outline'), hx_post=f'/{app_name}/unfinalize', hx_target=f'#{app_name}-container', hx_swap='outerHTML'), id=finalize_step.id)
else:
all_steps_complete = all((pip.get_step_data(pipeline_id, step.id, {}).get(step.done) for step in steps[:-1]))
if all_steps_complete:
return Card(H3('All steps complete. Finalize?'), P('You can revert to any step and make changes.', cls='text-secondary'), Form(Button('Finalize 🔒', type='submit', cls='primary'), hx_post=f'/{app_name}/finalize', hx_target=f'#{app_name}-container', hx_swap='outerHTML'), id=finalize_step.id)
else:
return Div(id=finalize_step.id)
else:
await pip.finalize_workflow(pipeline_id)
await self.message_queue.add(pip, self.step_messages['finalize']['complete'], verbatim=True)
return pip.run_all_cells(app_name, steps)
async def unfinalize(self, request):
pip, steps, app_name = (self.pipulate, self.steps, self.app_name)
pipeline_id = pip.db.get('pipeline_id', 'unknown')
await pip.unfinalize_workflow(pipeline_id)
await self.message_queue.add(pip, 'Workflow unfinalized! You can now revert to any step and make changes.', verbatim=True)
return pip.run_all_cells(app_name, steps)
async def get_suggestion(self, step_id, state):
if step_id == 'step_01':
return 'https://www.google.com'
return ''
async def handle_revert(self, request):
pip, steps, app_name = (self.pipulate, self.steps, self.app_name)
form = await request.form()
step_id = form.get('step_id')
pipeline_id = pip.db.get('pipeline_id', 'unknown')
if not step_id:
return P('Error: No step specified', cls='text-invalid')
await pip.clear_steps_from(pipeline_id, step_id, steps)
state = pip.read_state(pipeline_id)
state['_revert_target'] = step_id
pip.write_state(pipeline_id, state)
message = await pip.get_state_message(pipeline_id, steps, self.step_messages)
await self.message_queue.add(pip, message, verbatim=True)
return pip.run_all_cells(app_name, steps)
def _create_selenium_url_display(self, url_value, step_id):
"""Helper method to create the display for the URL and reopen button for Selenium."""
return Div(P(f'URL configured: ', B(url_value)), Form(Input(type='hidden', name='url', value=url_value), Button('Open URL Again 🪄', type='submit', cls='secondary'), hx_post=f'/{self.app_name}/reopen_selenium_url', hx_target=f'#{step_id}-status', hx_swap='innerHTML'), Div(id=f'{step_id}-status'))
async def _execute_selenium_open(self, url_to_open):
"""Core Selenium logic to open a URL."""
pip = self.pipulate
try:
chrome_options = Options()
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_options.add_argument('--new-window')
chrome_options.add_argument('--start-maximized')
profile_dir = tempfile.mkdtemp()
chrome_options.add_argument(f'--user-data-dir={profile_dir}')
effective_os = os.environ.get('EFFECTIVE_OS', 'unknown')
await self.message_queue.add(pip, f'Effective OS for Selenium: {effective_os}', verbatim=True)
if effective_os == 'darwin':
await self.message_queue.add(pip, 'Using webdriver-manager for macOS.', verbatim=True)
service = Service(ChromeDriverManager().install())
else:
await self.message_queue.add(pip, "Attempting to use system ChromeDriver (ensure it's in PATH).", verbatim=True)
service = Service()
await self.message_queue.add(pip, 'Initializing Chrome driver with Selenium...', verbatim=True)
driver = webdriver.Chrome(service=service, options=chrome_options)
await self.message_queue.add(pip, f'Selenium opening URL: {url_to_open}', verbatim=True)
driver.get(url_to_open)
await asyncio.sleep(2)
page_title = driver.title
await self.message_queue.add(pip, f'Selenium page loaded. Title: {page_title}', verbatim=True)
await asyncio.sleep(5)
driver.quit()
await self.message_queue.add(pip, 'Selenium browser closed.', verbatim=True)
shutil.rmtree(profile_dir, ignore_errors=True)
return (True, f'Successfully opened and closed: {url_to_open}. Page title: {page_title}')
except Exception as e:
error_msg = f'Selenium error: {str(e)}'
logger.error(error_msg)
await self.message_queue.add(pip, error_msg, verbatim=True)
if 'profile_dir' in locals() and os.path.exists(profile_dir):
shutil.rmtree(profile_dir, ignore_errors=True)
return (False, error_msg)
async def step_01(self, request):
pip, steps, app_name = (self.pipulate, self.steps, self.app_name)
step_id = 'step_01'
step_index = self.steps_indices[step_id]
step = steps[step_index]
next_step_id = 'finalize'
pipeline_id = pip.db.get('pipeline_id', 'unknown')
state = pip.read_state(pipeline_id)
step_data = pip.get_step_data(pipeline_id, step_id, {})
user_val = step_data.get(step.done, '')
finalize_data = pip.get_step_data(pipeline_id, 'finalize', {})
if 'finalized' in finalize_data and user_val:
url_widget_display = self._create_selenium_url_display(user_val, step_id)
return Div(Card(H3(f'🔒 {step.show}'), url_widget_display), Div(id=next_step_id, hx_get=f'/{app_name}/{next_step_id}', hx_trigger='load'), id=step_id)
elif user_val and state.get('_revert_target') != step_id:
url_widget_display = self._create_selenium_url_display(user_val, step_id)
content_container = pip.display_revert_widget(step_id=step_id, app_name=app_name, message=f'{step.show}: {user_val}', widget=url_widget_display, steps=steps)
return Div(content_container, Div(id=next_step_id, hx_get=f'/{app_name}/{next_step_id}', hx_trigger='load'), id=step_id)
else:
display_value = user_val if step.refill and user_val else await self.get_suggestion(step_id, state)
await self.message_queue.add(pip, self.step_messages[step_id]['input'], verbatim=True)
explanation = 'Enter a URL to open with Selenium (e.g., https://www.google.com).'
await self.message_queue.add(pip, explanation, verbatim=True)
return Div(Card(H3(f'{pip.fmt(step_id)}: Configure {step.show}'), P(explanation, cls='text-secondary'), Form(Div(Input(type='url', name=step.done, placeholder='https://www.google.com', required=True, value=display_value, cls='contrast'), Div(Button('Open with Selenium ▸', type='submit', cls='primary'), style='margin-top: 1vh; text-align: right;'), cls='w-full'), hx_post=f'/{app_name}/{step_id}_submit', hx_target=f'#{step_id}')), Div(id=next_step_id), id=step_id)
async def step_01_submit(self, request):
pip, steps, app_name = (self.pipulate, self.steps, self.app_name)
step_id = 'step_01'
step_index = self.steps_indices[step_id]
step = steps[step_index]
next_step_id = 'finalize'
pipeline_id = pip.db.get('pipeline_id', 'unknown')
form = await request.form()
url_to_open = form.get(step.done, '').strip()
if not url_to_open:
return P('Error: URL is required', cls='text-invalid')
if not url_to_open.startswith(('http://', 'https://')):
url_to_open = f'https://{url_to_open}'
await pip.set_step_data(pipeline_id, step_id, url_to_open, steps)
success, message = await self._execute_selenium_open(url_to_open)
pip.append_to_history(f'[WIDGET ACTION] {step.show}: Attempted to open URL {url_to_open}. Success: {success}. Message: {message}')
url_widget_display = self._create_selenium_url_display(url_to_open, step_id)
status_message_widget = P(message, cls='text-valid' if success else 'text-invalid')
combined_widget = Div(url_widget_display, status_message_widget)
content_container = pip.display_revert_widget(step_id=step_id, app_name=app_name, message=f'{step.show}: URL processed - {url_to_open}', widget=combined_widget, steps=steps)
response_content = Div(content_container, Div(id=next_step_id, hx_get=f'/{app_name}/{next_step_id}', hx_trigger='load'), id=step_id)
await self.message_queue.add(pip, f'{step.show} complete. {message}', verbatim=True)
if pip.check_finalize_needed(step_index, steps):
await self.message_queue.add(pip, self.step_messages['finalize']['ready'], verbatim=True)
return HTMLResponse(to_xml(response_content))
async def reopen_selenium_url(self, request):
"""Handles reopening a URL with Selenium via a button press."""
pip = self.pipulate
form = await request.form()
url_to_open = form.get('url', '').strip()
if not url_to_open:
return P('Error: URL for reopening is missing.', cls='text-invalid')
success, message = await self._execute_selenium_open(url_to_open)
if success:
return P(f"Successfully reopened: {url_to_open}. Page title: {message.split('Page title: ')[-1]}", style='color: green;')
else:
return P(f'Error reopening URL: {message}', cls='text-invalid')
I have 440_browser_automation.py
:
import asyncio
import json
import os
from datetime import datetime
from urllib.parse import quote, urlparse
from fasthtml.common import *
from loguru import logger
from imports.crud import Step # 🎯 STANDARDIZED: Import centralized Step definition
from selenium import webdriver
from selenium.common.exceptions import TimeoutException
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
from seleniumwire import webdriver as wire_webdriver
from starlette.responses import HTMLResponse, JSONResponse
from webdriver_manager.chrome import ChromeDriverManager
ROLES = ['Workshop']
'\nPipulate Browser Automation Workflow\n\nThis workflow demonstrates Selenium-based browser automation capabilities:\n- Cross-platform Chrome automation (Linux/macOS)\n- Clean browser sessions with temporary profiles\n- Detailed status logging and error handling\n- URL opening and verification\n'
def get_safe_path(url):
"""Convert URL to filesystem-safe path while maintaining reversibility."""
parsed = urlparse(url)
domain = parsed.netloc
path = parsed.path
if not path or path == '/':
path = '/'
path = quote(path + ('?' + parsed.query if parsed.query else ''), safe='')
return (domain, path)
def reconstruct_url(domain, path):
"""Reconstruct URL from filesystem components."""
return f'https://{domain}{path}'
def ensure_crawl_dir(app_name, domain, date_slug):
"""Ensure crawl directory exists and return its path."""
base_dir = os.path.join('downloads', app_name, domain, date_slug)
os.makedirs(base_dir, exist_ok=True)
return base_dir
class BrowserAutomation:
"""
Browser Automation Workflow
A workflow that demonstrates Selenium integration for browser automation tasks.
This serves as the primary development ground for Pipulate's browser automation features.
"""
APP_NAME = 'browser'
DISPLAY_NAME = 'Browser Automation 🤖'
ENDPOINT_MESSAGE = "Open URLs using Selenium for browser automation. This workflow demonstrates Pipulate's browser automation capabilities."
TRAINING_PROMPT = 'This workflow showcases browser automation using Selenium. It uses webdriver-manager for cross-platform compatibility and provides a foundation for developing more advanced automation features.'
def __init__(self, app, pipulate, pipeline, db, app_name=APP_NAME):
self.pipulate = pipulate
"""Initialize the workflow, define steps, and register routes."""
self.app = app
self.app_name = app_name
self.pipulate = pipulate
self.pipeline = pipeline
self.steps_indices = {}
pip = self.pipulate
pip = self.pipulate
self.message_queue = pip.message_queue
steps = [Step(id='step_01', done='url', show='Enter URL', refill=True), Step(id='step_02', done='placeholder', show='Placeholder Step', refill=True), Step(id='step_03', done='session_test_complete', show='Ephemeral Login Test', refill=False), Step(id='step_04', done='persistent_session_test_complete', show='Persistent Login Test', refill=False), Step(id='step_05', done='placeholder', show='Step 5 Placeholder', refill=False)]
routes = [(f'/{app_name}', self.landing), (f'/{app_name}/init', self.init, ['POST']), (f'/{app_name}/revert', self.handle_revert, ['POST']), (f'/{app_name}/finalize', self.finalize, ['GET', 'POST']), (f'/{app_name}/unfinalize', self.unfinalize, ['POST']), (f'/{app_name}/reopen_url', self.reopen_url, ['POST'])]
self.steps = steps
for step in steps:
step_id = step.id
routes.append((f'/{app_name}/{step_id}', getattr(self, step_id)))
routes.append((f'/{app_name}/{step_id}_submit', getattr(self, f'{step_id}_submit'), ['POST']))
if step_id in ['step_03', 'step_04']:
routes.append((f'/{app_name}/{step_id}_confirm', getattr(self, f'{step_id}_confirm'), ['POST']))
for path, handler, *methods in routes:
method_list = methods[0] if methods else ['GET']
app.route(path, methods=method_list)(handler)
self.step_messages = {'finalize': {'ready': 'All steps complete. Ready to finalize workflow.', 'complete': f'Workflow finalized. Use {pip.UNLOCK_BUTTON_LABEL} to make changes.'}}
for step in steps:
self.step_messages[step.id] = {'input': f'{pip.fmt(step.id)}: Please complete {step.show}.', 'complete': f'{step.show} complete. Continue to next step.'}
steps.append(Step(id='finalize', done='finalized', show='Finalize', refill=False))
self.steps_indices = {step.id: i for i, step in enumerate(steps)}
async def landing(self, request):
"""Generate the landing page using the standardized helper while maintaining WET explicitness."""
pip = self.pipulate
# Use centralized landing page helper - maintains WET principle by explicit call
return pip.create_standard_landing_page(self)
async def init(self, request):
"""Handles the key submission, initializes state, and renders the step UI placeholders."""
pip, steps, app_name = (self.pipulate, self.steps, self.app_name)
form = await request.form()
user_input = form.get('pipeline_id', '').strip()
if not user_input:
from starlette.responses import Response
response = Response('')
response.headers['HX-Refresh'] = 'true'
return response
context = pip.get_plugin_context(self)
profile_name = context['profile_name'] or 'default'
plugin_name = app_name # Use app_name directly to ensure consistency
profile_part = profile_name.replace(' ', '_')
plugin_part = plugin_name.replace(' ', '_')
expected_prefix = f'{profile_part}-{plugin_part}-'
if user_input.startswith(expected_prefix):
pipeline_id = user_input
else:
_, prefix, user_provided_id = pip.generate_pipeline_key(self, user_input)
pipeline_id = f'{prefix}{user_provided_id}'
pip.db['pipeline_id'] = pipeline_id
state, error = pip.initialize_if_missing(pipeline_id, {'app_name': app_name})
if error:
return error
await self.message_queue.add(pip, f'Workflow ID: {pipeline_id}', verbatim=True, spaces_before=0)
await self.message_queue.add(pip, f"Return later by selecting '{pipeline_id}' from the dropdown.", verbatim=True, spaces_before=0)
return pip.run_all_cells(app_name, steps)
async def finalize(self, request):
"""Handles GET request to show Finalize button and POST request to lock the workflow."""
pip, steps, app_name = (self.pipulate, self.steps, self.app_name)
pipeline_id = pip.db.get('pipeline_id', 'unknown')
finalize_step = steps[-1]
finalize_data = pip.get_step_data(pipeline_id, finalize_step.id, {})
if request.method == 'GET':
if finalize_step.done in finalize_data:
return Card(H3('Workflow is locked.'), Form(Button(pip.UNLOCK_BUTTON_LABEL, type='submit', cls='secondary outline'), hx_post=f'/{app_name}/unfinalize', hx_target=f'#{finalize_step.id}'), id=finalize_step.id)
else:
all_steps_complete = all((pip.get_step_data(pipeline_id, step.id, {}).get(step.done) for step in steps[:-1]))
if all_steps_complete:
return Card(H3('All steps complete. Finalize?'), P('You can revert to any step and make changes.', cls='text-secondary'), Form(Button('Finalize 🔒', type='submit', cls='primary'), hx_post=f'/{app_name}/finalize', hx_target=f'#{finalize_step.id}'), id=finalize_step.id)
else:
return Div(id=finalize_step.id)
else:
state = pip.read_state(pipeline_id)
for step in steps[:-1]:
step_data = pip.get_step_data(pipeline_id, step.id, {})
if step.done in step_data:
state[step.id] = step_data
state['finalize'] = {'finalized': True}
state['updated'] = datetime.now().isoformat()
pip.write_state(pipeline_id, state)
await self.message_queue.add(pip, self.step_messages['finalize']['complete'], verbatim=True)
return Card(H3('Workflow is locked.'), Form(Button(pip.UNLOCK_BUTTON_LABEL, type='submit', cls='secondary outline'), hx_post=f'/{app_name}/unfinalize', hx_target=f'#{finalize_step.id}'), id=finalize_step.id)
async def unfinalize(self, request):
"""Handles POST request to unlock the workflow."""
pip, steps, app_name = (self.pipulate, self.steps, self.app_name)
pipeline_id = pip.db.get('pipeline_id', 'unknown')
state = pip.read_state(pipeline_id)
if 'finalize' in state:
del state['finalize']
for step in steps[:-1]:
if step.id in state and step.done in state[step.id]:
pass
pip.write_state(pipeline_id, state)
await self.message_queue.add(pip, 'Workflow unfinalized! You can now revert to any step and make changes.', verbatim=True)
return Card(H3('All steps complete. Finalize?'), P('You can revert to any step and make changes.', cls='text-secondary'), Form(Button('Finalize 🔒', type='submit', cls='primary'), hx_post=f'/{app_name}/finalize', hx_target=f'#{steps[-1].id}'), id=steps[-1].id)
async def get_suggestion(self, step_id, state):
"""Gets a suggested input value for a step, often using the previous step's transformed output."""
pip, db, steps = (self.pipulate, self.pipulate.db, self.steps)
step = next((s for s in steps if s.id == step_id), None)
if not step or not step.transform:
return ''
prev_index = self.steps_indices[step_id] - 1
if prev_index < 0:
return ''
prev_step = steps[prev_index]
prev_data = pip.get_step_data(pip.db['pipeline_id'], prev_step.id, {})
prev_value = prev_data.get(prev_step.done, '')
return step.transform(prev_value) if prev_value else ''
async def handle_revert(self, request):
"""Handles POST request to revert to a previous step, clearing subsequent step data."""
pip, steps, app_name = (self.pipulate, self.steps, self.app_name)
form = await request.form()
step_id = form.get('step_id')
pipeline_id = pip.db.get('pipeline_id', 'unknown')
if not step_id:
return P('Error: No step specified', cls='text-invalid')
await pip.clear_steps_from(pipeline_id, step_id, steps)
state = pip.read_state(pipeline_id)
if step_id == 'step_03':
step_data = state.get(step_id, {})
if 'session_test_confirmed' in step_data:
del step_data['session_test_confirmed']
state[step_id] = step_data
elif step_id == 'step_04':
step_data = state.get(step_id, {})
if 'persistent_session_test_confirmed' in step_data:
del step_data['persistent_session_test_confirmed']
state[step_id] = step_data
state['_revert_target'] = step_id
pip.write_state(pipeline_id, state)
message = await pip.get_state_message(pipeline_id, steps, self.step_messages)
await self.message_queue.add(pip, message, verbatim=True)
return pip.run_all_cells(app_name, steps)
async def step_01(self, request):
"""Handles GET request for Open URL step."""
pip, steps, app_name = (self.pipulate, self.steps, self.app_name)
step_id = 'step_01'
step_index = self.steps_indices[step_id]
step = steps[step_index]
next_step_id = steps[step_index + 1].id if step_index < len(steps) - 1 else 'finalize'
pipeline_id = pip.db.get('pipeline_id', 'unknown')
state = pip.read_state(pipeline_id)
step_data = pip.get_step_data(pipeline_id, step_id, {})
url_value = step_data.get(step.done, '')
finalize_data = pip.get_step_data(pipeline_id, 'finalize', {})
if 'finalized' in finalize_data and url_value:
return Div(Card(H3(f'🔒 Open URL'), P(f'URL opened (and closed): ', B(url_value)), Div(id=f'{step_id}-status')), Div(id=next_step_id, hx_get=f'/{app_name}/{next_step_id}', hx_trigger='load'), id=step_id)
elif url_value and state.get('_revert_target') != step_id:
content_container = pip.display_revert_widget(step_id=step_id, app_name=app_name, message=f'Open URL: {url_value}', widget=Div(P(f'URL opened (and closed): ', B(url_value)), Div(id=f'{step_id}-status')), steps=steps)
return Div(content_container, Div(id=next_step_id, hx_get=f'/{app_name}/{next_step_id}', hx_trigger='load'), id=step_id)
else:
await self.message_queue.add(pip, 'Enter the URL you want to open with Selenium:', verbatim=True)
display_value = url_value if step.refill and url_value else 'https://example.com'
return Div(Card(H3('Open URL'), Form(Input(type='url', name='url', placeholder='https://example.com', required=True, value=display_value, cls='contrast'), Button('Open URL', type='submit', cls='primary'), hx_post=f'/{app_name}/{step_id}_submit', hx_target=f'#{step_id}')), Div(id=next_step_id), id=step_id)
async def step_01_submit(self, request):
"""Process the Open URL submission and open it with Selenium."""
pip, steps, app_name = (self.pipulate, self.steps, self.app_name)
step_id = 'step_01'
step_index = self.steps_indices[step_id]
step = steps[step_index]
next_step_id = steps[step_index + 1].id if step_index < len(steps) - 1 else 'finalize'
pipeline_id = pip.db.get('pipeline_id', 'unknown')
form = await request.form()
url = form.get('url', '').strip()
if not url:
return P('Error: URL is required', cls='text-invalid')
if not url.startswith(('http://', 'https://')):
url = f'https://{url}'
await pip.set_step_data(pipeline_id, step_id, url, steps)
try:
chrome_options = Options()
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_options.add_argument('--new-window')
chrome_options.add_argument('--start-maximized')
import tempfile
profile_dir = tempfile.mkdtemp()
chrome_options.add_argument(f'--user-data-dir={profile_dir}')
effective_os = os.environ.get('EFFECTIVE_OS', 'unknown')
await self.message_queue.add(pip, f'Current OS: {effective_os}', verbatim=True)
if effective_os == 'darwin':
await self.message_queue.add(pip, 'Using webdriver-manager for macOS', verbatim=True)
service = Service(ChromeDriverManager().install())
else:
await self.message_queue.add(pip, 'Using system Chrome for Linux', verbatim=True)
service = Service()
await self.message_queue.add(pip, 'Initializing Chrome driver...', verbatim=True)
driver = webdriver.Chrome(service=service, options=chrome_options)
await self.message_queue.add(pip, f'Opening URL with Selenium: {url}', verbatim=True)
driver.get(url)
await asyncio.sleep(2)
title = driver.title
await self.message_queue.add(pip, f'Page loaded successfully. Title: {title}', verbatim=True)
driver.quit()
await self.message_queue.add(pip, 'Browser closed successfully', verbatim=True)
import shutil
shutil.rmtree(profile_dir, ignore_errors=True)
except Exception as e:
error_msg = f'Error opening URL with Selenium: {str(e)}'
logger.error(error_msg)
safe_error_msg = error_msg.replace('<', '<').replace('>', '>')
await self.message_queue.add(pip, safe_error_msg, verbatim=True)
return P(error_msg, cls='text-invalid')
url_widget = Div(P(f'URL opened (and closed): ', B(url)), Div(id=f'{step_id}-status'))
content_container = pip.display_revert_widget(step_id=step_id, app_name=app_name, message=f'Open URL: {url}', widget=url_widget, steps=steps)
return Div(content_container, Div(id=next_step_id, hx_get=f'/{app_name}/{next_step_id}', hx_trigger='load'), id=step_id)
async def reopen_url(self, request):
"""Handle reopening a URL with Selenium."""
pip, db = (self.pipulate, self.pipulate.db)
form = await request.form()
url = form.get('url', '').strip()
if not url:
return P('Error: URL is required', cls='text-invalid')
try:
chrome_options = Options()
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_options.add_argument('--new-window')
chrome_options.add_argument('--start-maximized')
import tempfile
profile_dir = tempfile.mkdtemp()
chrome_options.add_argument(f'--user-data-dir={profile_dir}')
effective_os = os.environ.get('EFFECTIVE_OS', 'unknown')
await self.message_queue.add(pip, f'Current OS: {effective_os}', verbatim=True)
if effective_os == 'darwin':
await self.message_queue.add(pip, 'Using webdriver-manager for macOS', verbatim=True)
service = Service(ChromeDriverManager().install())
else:
await self.message_queue.add(pip, 'Using system Chrome for Linux', verbatim=True)
service = Service()
await self.message_queue.add(pip, 'Initializing Chrome driver...', verbatim=True)
driver = webdriver.Chrome(service=service, options=chrome_options)
await self.message_queue.add(pip, f'Reopening URL with Selenium: {url}', verbatim=True)
driver.get(url)
await asyncio.sleep(2)
title = driver.title
await self.message_queue.add(pip, f'Page loaded successfully. Title: {title}', verbatim=True)
driver.quit()
await self.message_queue.add(pip, 'Browser closed successfully', verbatim=True)
import shutil
shutil.rmtree(profile_dir, ignore_errors=True)
return P(f'Successfully reopened: {url}', style='color: green;')
except Exception as e:
error_msg = f'Error reopening URL with Selenium: {str(e)}'
logger.error(error_msg)
await self.message_queue.add(pip, error_msg, verbatim=True)
return P(error_msg, cls='text-invalid')
async def step_02(self, request):
"""Handles GET request for Crawl URL step (identical to Step 1, independent state, crawl semantics)."""
pip, steps, app_name = (self.pipulate, self.steps, self.app_name)
step_id = 'step_02'
step_index = self.steps_indices[step_id]
step = steps[step_index]
next_step_id = steps[step_index + 1].id if step_index < len(steps) - 1 else 'finalize'
pipeline_id = pip.db.get('pipeline_id', 'unknown')
state = pip.read_state(pipeline_id)
step_data = pip.get_step_data(pipeline_id, step_id, {})
url_value = step_data.get(step.done, '')
finalize_data = pip.get_step_data(pipeline_id, 'finalize', {})
if 'finalized' in finalize_data and url_value:
return Div(Card(H3(f'🔒 Crawl URL'), P(f'URL crawled and saved: ', B(url_value.get('url', ''))), Div(id=f'{step_id}-status')), Div(id=next_step_id, hx_get=f'/{app_name}/{next_step_id}', hx_trigger='load'), id=step_id)
elif url_value and state.get('_revert_target') != step_id:
content_container = pip.display_revert_widget(step_id=step_id, app_name=app_name, message=f"Crawl URL: {url_value.get('url', '')}", widget=Div(P(f'URL crawled and saved: ', B(url_value.get('url', ''))), P(f"Title: {url_value.get('title', '')}"), P(f"Status: {url_value.get('status', '')}"), P(f"Saved to: {url_value.get('save_path', '')}"), P(f"Reconstructed URL: {url_value.get('reconstructed_url', '')}", cls='text-secondary'), Div(id=f'{step_id}-status')), steps=steps)
return Div(content_container, Div(id=next_step_id, hx_get=f'/{app_name}/{next_step_id}', hx_trigger='load'), id=step_id)
else:
await self.message_queue.add(pip, 'Enter the URL you want to crawl:', verbatim=True)
display_value = ''
if step.refill and url_value:
if isinstance(url_value, dict) and 'url' in url_value:
display_value = url_value['url']
else:
display_value = url_value
if not display_value:
display_value = 'https://example.com'
return Div(Card(H3('Crawl URL'), Form(Input(type='url', name='url', placeholder='https://example.com', required=True, value=display_value, cls='contrast'), Button('Crawl URL', type='submit', cls='primary'), hx_post=f'/{app_name}/{step_id}_submit', hx_target=f'#{step_id}')), Div(id=next_step_id), id=step_id)
async def step_02_submit(self, request):
"""Process the Crawl URL submission, open with Selenium-wire, and save crawl data."""
pip, steps, app_name = (self.pipulate, self.steps, self.app_name)
step_id = 'step_02'
step_index = self.steps_indices[step_id]
step = steps[step_index]
next_step_id = steps[step_index + 1].id if step_index < len(steps) - 1 else 'finalize'
pipeline_id = pip.db.get('pipeline_id', 'unknown')
form = await request.form()
url = form.get('url', '').strip()
if not url:
return P('Error: URL is required', cls='text-invalid')
if not url.startswith(('http://', 'https://')):
url = f'https://{url}'
try:
chrome_options = Options()
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_options.add_argument('--new-window')
chrome_options.add_argument('--start-maximized')
import tempfile
profile_dir = tempfile.mkdtemp()
chrome_options.add_argument(f'--user-data-dir={profile_dir}')
effective_os = os.environ.get('EFFECTIVE_OS', 'unknown')
await self.message_queue.add(pip, f'Current OS: {effective_os}', verbatim=True)
if effective_os == 'darwin':
await self.message_queue.add(pip, 'Using webdriver-manager for macOS', verbatim=True)
service = Service(ChromeDriverManager().install())
else:
await self.message_queue.add(pip, 'Using system Chrome for Linux', verbatim=True)
service = Service()
await self.message_queue.add(pip, 'Initializing Chrome driver...', verbatim=True)
driver = wire_webdriver.Chrome(service=service, options=chrome_options)
await self.message_queue.add(pip, f'Crawling URL with Selenium: {url}', verbatim=True)
driver.get(url)
await asyncio.sleep(2)
title = driver.title
source = driver.page_source
dom = driver.execute_script('return document.documentElement.outerHTML;')
main_request = None
for request in driver.requests:
if request.response and request.url.startswith(url):
main_request = request
break
if not main_request:
for request in driver.requests:
if request.response:
main_request = request
break
if main_request and main_request.response:
headers = dict(main_request.response.headers)
status = main_request.response.status_code
else:
headers = {}
status = 200
domain, path = get_safe_path(url)
date_slug = datetime.now().strftime('%Y%m%d')
base_dir = ensure_crawl_dir(app_name, domain, date_slug)
crawl_dir = os.path.join(base_dir, path)
os.makedirs(crawl_dir, exist_ok=True)
with open(os.path.join(crawl_dir, 'headers.json'), 'w') as f:
json.dump(headers, f, indent=2)
with open(os.path.join(crawl_dir, 'source.html'), 'w') as f:
f.write(source)
with open(os.path.join(crawl_dir, 'dom.html'), 'w') as f:
f.write(dom)
driver.quit()
await self.message_queue.add(pip, 'Browser closed successfully', verbatim=True)
import shutil
shutil.rmtree(profile_dir, ignore_errors=True)
reconstructed_url = reconstruct_url(domain, path)
crawl_data = {'url': url, 'title': title, 'status': status, 'save_path': crawl_dir, 'timestamp': datetime.now().isoformat(), 'reconstructed_url': reconstructed_url}
await pip.set_step_data(pipeline_id, step_id, crawl_data, steps)
await self.message_queue.add(pip, f'{step.show} complete.', verbatim=True)
url_widget = Div(P(f'URL crawled and saved: ', B(crawl_data['url'])), P(f'Title: {title}'), P(f'Status: {status}'), P(f'Saved to: {crawl_dir}'), P(f'Reconstructed URL: {reconstructed_url}', cls='text-secondary'), Div(id=f'{step_id}-status'))
content_container = pip.display_revert_widget(step_id=step_id, app_name=app_name, message=f"Crawl URL: {crawl_data['url']}", widget=url_widget, steps=steps)
return Div(content_container, Div(id=next_step_id, hx_get=f'/{app_name}/{next_step_id}', hx_trigger='load'), id=step_id)
except Exception as e:
error_msg = f'Error crawling URL with Selenium: {str(e)}'
logger.error(error_msg)
safe_error_msg = error_msg.replace('<', '<').replace('>', '>')
await self.message_queue.add(pip, safe_error_msg, verbatim=True)
return P(error_msg, cls='text-invalid')
def _get_selenium_profile_paths(self, pipeline_id: str, desired_profile_leaf_name: str = 'google_session') -> tuple[str, str]:
"""Get the user data directory and profile directory paths for Chrome.
Returns a tuple of (user_data_dir_path, profile_directory_name) where:
- user_data_dir_path is the parent directory for Chrome's user data
- profile_directory_name is the specific profile to use within that directory
"""
from pathlib import Path
user_data_root = Path('data') / self.app_name / 'selenium_user_data'
user_data_root.mkdir(parents=True, exist_ok=True)
return (str(user_data_root), 'google_session')
def _get_persistent_profile_paths(self, pipeline_id: str) -> tuple[str, str]:
"""Get the persistent user data directory and profile directory paths for Chrome.
This version uses a fixed location that won't be cleared on server restart.
"""
from pathlib import Path
user_data_root = Path('data') / self.app_name / 'persistent_profiles'
user_data_root.mkdir(parents=True, exist_ok=True)
return (str(user_data_root), 'google_session')
async def step_03(self, request):
"""Handles GET request for Ephemeral Login Test."""
pipeline_id = self.pipulate.db.get('pipeline_id', 'unknown')
if not pipeline_id or pipeline_id == 'unknown':
return JSONResponse(status_code=400, content={'error': 'No pipeline ID found in db'})
user_data_dir, profile_dir = self._get_selenium_profile_paths(pipeline_id)
step_data = self.pipulate.get_step_data(pipeline_id, 'step_03', {})
is_completed = step_data.get('session_test_complete', False)
is_confirmed = step_data.get('session_test_confirmed', False)
step_index = self.steps_indices['step_03']
next_step_id = self.steps[step_index + 1].id if step_index < len(self.steps) - 1 else 'finalize'
state = self.pipulate.read_state(pipeline_id)
is_being_reverted = state.get('_revert_target') == 'step_03'
if is_confirmed:
return Div(self.pipulate.display_revert_header(step_id='step_03', app_name=self.app_name, message='Ephemeral Login Test', steps=self.steps), Div(id=next_step_id, hx_get=f'/{self.app_name}/{next_step_id}', hx_trigger='load'), id='step_03')
elif is_completed and (not is_being_reverted):
return Div(Card(H3('Ephemeral Login Test'), P('✅ Test completed!'), P('Please confirm that you have successfully logged in and verified the session persistence.'), P(f'Profile directory: {user_data_dir}/{profile_dir}'), P('Note: This profile will be cleared when the server restarts.', style='color: #666; font-style: italic;'), Form(Button('Check Login Status', type='submit', cls='secondary'), hx_post=f'/{self.app_name}/step_03_submit', hx_target='#step_03'), Form(Button('Confirm Test Completion', type='submit', cls='primary'), hx_post=f'/{self.app_name}/step_03_confirm', hx_target='#step_03')), Div(id=next_step_id, hx_get=f'/{self.app_name}/{next_step_id}', hx_trigger='load'), id='step_03')
## The Refactoring Imperative: Lessons Learned
else:
return Div(Card(H3('Ephemeral Login Test'), P('Instructions:'), P('1. Click the button below to open Google in a new browser window'), P('2. Log in to your Google account'), P('3. Close the browser window when done'), P('4. Return here to check your session status'), P('Note: This profile will be cleared when the server restarts.', style='color: #666; font-style: italic;'), Form(Button('Open Google & Log In', type='submit', cls='primary'), hx_post=f'/{self.app_name}/step_03_submit', hx_target='#step_03')), id='step_03')
async def step_03_submit(self, request):
"""Handles POST request for Ephemeral Login Test."""
try:
pipeline_id = self.pipulate.db.get('pipeline_id', 'unknown')
if not pipeline_id or pipeline_id == 'unknown':
return JSONResponse(status_code=400, content={'error': 'No pipeline ID found in db'})
user_data_dir, profile_dir = self._get_selenium_profile_paths(pipeline_id)
step_data = self.pipulate.get_step_data(pipeline_id, 'step_03', {})
is_completed = step_data.get('session_test_complete', False)
chrome_options = Options()
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_options.add_argument('--disable-gpu')
chrome_options.add_argument('--window-size=1920,1080')
chrome_options.add_argument(f'--user-data-dir={user_data_dir}')
chrome_options.add_argument(f'--profile-directory={profile_dir}')
chrome_options.add_argument('--disable-blink-features=AutomationControlled')
chrome_options.add_experimental_option('excludeSwitches', ['enable-automation'])
chrome_options.add_experimental_option('useAutomationExtension', False)
driver = webdriver.Chrome(options=chrome_options)
driver.execute_cdp_cmd('Page.addScriptToEvaluateOnNewDocument', {'source': "\n Object.defineProperty(navigator, 'webdriver', {\n get: () => undefined\n });\n Object.defineProperty(navigator, 'apps', {\n get: () => [1, 2, 3, 4, 5]\n });\n Object.defineProperty(navigator, 'languages', {\n get: () => ['en-US', 'en']\n });\n "})
try:
driver.get('https://www.google.com')
WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.NAME, 'q')))
try:
profile_pic = WebDriverWait(driver, 0.5).until(EC.presence_of_element_located((By.CSS_SELECTOR, "img[alt*='Google Account']")))
is_logged_in = True
login_status = '✅ Logged In'
except TimeoutException:
is_logged_in = False
login_status = '❌ Not Logged In'
step_data['session_test_complete'] = True
step_data['is_logged_in'] = is_logged_in
step_data['user_data_dir'] = user_data_dir
step_data['profile_dir'] = profile_dir
state = self.pipulate.read_state(pipeline_id)
state['step_03'] = step_data
self.pipulate.write_state(pipeline_id, state)
return Div(Card(H3('Ephemeral Login Test'), P('Instructions:'), P('1. A new browser window has opened with Google'), P('2. Log in to your Google account in that window'), P('3. After logging in, close the browser window'), P('4. Return here and click the button below to confirm test completion'), P(f'Current Status: {login_status}'), Form(Button('Check Login Status', type='submit', cls='secondary'), hx_post=f'/{self.app_name}/step_03_submit', hx_target='#step_03'), Form(Button('Confirm Test Completion', type='submit', cls='primary'), hx_post=f'/{self.app_name}/step_03_confirm', hx_target='#step_03')), id='step_03')
except Exception as e:
driver.quit()
raise e
except Exception as e:
return JSONResponse(status_code=500, content={'error': str(e)})
async def step_03_confirm(self, request):
"""Handle confirmation of Ephemeral Login Test."""
pip, steps, app_name = (self.pipulate, self.steps, self.app_name)
step_id = 'step_03'
step_index = self.steps_indices[step_id]
step = steps[step_index]
next_step_id = steps[step_index + 1].id if step_index < len(steps) - 1 else 'finalize'
pipeline_id = pip.db.get('pipeline_id', 'unknown')
state = pip.read_state(pipeline_id)
step_data = state.get(step_id, {})
step_data[step.done] = True
step_data['session_test_confirmed'] = True
state[step_id] = step_data
pip.write_state(pipeline_id, state)
await self.message_queue.add(pip, 'Ephemeral login test confirmed!', verbatim=True)
return Div(pip.display_revert_header(step_id=step_id, app_name=app_name, message='Ephemeral Login Test', steps=steps), Div(id=next_step_id, hx_get=f'/{app_name}/{next_step_id}', hx_trigger='load'), id=step_id)
async def step_04(self, request):
"""Handles GET request for Persistent Login Test."""
pipeline_id = self.pipulate.db.get('pipeline_id', 'unknown')
if not pipeline_id or pipeline_id == 'unknown':
return JSONResponse(status_code=400, content={'error': 'No pipeline ID found in db'})
user_data_dir, profile_dir = self._get_persistent_profile_paths(pipeline_id)
step_data = self.pipulate.get_step_data(pipeline_id, 'step_04', {})
is_completed = step_data.get('persistent_session_test_complete', False)
is_confirmed = step_data.get('persistent_session_test_confirmed', False)
step_index = self.steps_indices['step_04']
next_step_id = self.steps[step_index + 1].id if step_index < len(self.steps) - 1 else 'finalize'
state = self.pipulate.read_state(pipeline_id)
is_being_reverted = state.get('_revert_target') == 'step_04'
if is_confirmed:
return Div(self.pipulate.display_revert_header(step_id='step_04', app_name=self.app_name, message='Persistent Login Test', steps=self.steps), Div(id=next_step_id, hx_get=f'/{self.app_name}/{next_step_id}', hx_trigger='load'), id='step_04')
elif is_completed and (not is_being_reverted):
return Div(Card(H3('Persistent Login Test'), P('✅ Test completed!'), P('Please confirm that you have successfully logged in and verified the session persistence.'), P(f'Profile directory: {user_data_dir}/{profile_dir}'), P('Note: This profile will persist across server restarts.', style='color: #666; font-style: italic;'), Form(Button('Check Login Status', type='submit', cls='secondary'), hx_post=f'/{self.app_name}/step_04_submit', hx_target='#step_04'), Form(Button('Confirm Test Completion', type='submit', cls='primary'), hx_post=f'/{self.app_name}/step_04_confirm', hx_target='#step_04')), Div(id=next_step_id, hx_get=f'/{self.app_name}/{next_step_id}', hx_trigger='load'), id='step_04')
else:
return Div(Card(H3('Persistent Login Test'), P('Instructions:'), P('1. Click the button below to open Google in a new browser window'), P('2. Log in to your Google account'), P('3. Close the browser window when done'), P('4. Return here to check your session status'), P('Note: This profile will persist across server restarts.', style='color: #666; font-style: italic;'), Form(Button('Open Google & Log In', type='submit', cls='primary'), hx_post=f'/{self.app_name}/step_04_submit', hx_target='#step_04')), id='step_04')
async def step_04_submit(self, request):
"""Handles POST request for Persistent Login Test."""
try:
pipeline_id = self.pipulate.db.get('pipeline_id', 'unknown')
if not pipeline_id or pipeline_id == 'unknown':
return JSONResponse(status_code=400, content={'error': 'No pipeline ID found in db'})
user_data_dir, profile_dir = self._get_persistent_profile_paths(pipeline_id)
step_data = self.pipulate.get_step_data(pipeline_id, 'step_04', {})
is_completed = step_data.get('persistent_session_test_complete', False)
chrome_options = Options()
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_options.add_argument('--disable-gpu')
chrome_options.add_argument('--window-size=1920,1080')
chrome_options.add_argument(f'--user-data-dir={user_data_dir}')
chrome_options.add_argument(f'--profile-directory={profile_dir}')
chrome_options.add_argument('--disable-blink-features=AutomationControlled')
chrome_options.add_experimental_option('excludeSwitches', ['enable-automation'])
chrome_options.add_experimental_option('useAutomationExtension', False)
driver = webdriver.Chrome(options=chrome_options)
driver.execute_cdp_cmd('Page.addScriptToEvaluateOnNewDocument', {'source': "\n Object.defineProperty(navigator, 'webdriver', {\n get: () => undefined\n });\n Object.defineProperty(navigator, 'apps', {\n get: () => [1, 2, 3, 4, 5]\n });\n Object.defineProperty(navigator, 'languages', {\n get: () => ['en-US', 'en']\n });\n "})
try:
driver.get('https://www.google.com')
WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.NAME, 'q')))
try:
profile_pic = WebDriverWait(driver, 0.5).until(EC.presence_of_element_located((By.CSS_SELECTOR, "img[alt*='Google Account']")))
is_logged_in = True
login_status = '✅ Logged In'
except TimeoutException:
is_logged_in = False
login_status = '❌ Not Logged In'
step_data['persistent_session_test_complete'] = True
step_data['is_logged_in'] = is_logged_in
step_data['user_data_dir'] = user_data_dir
step_data['profile_dir'] = profile_dir
state = self.pipulate.read_state(pipeline_id)
state['step_04'] = step_data
self.pipulate.write_state(pipeline_id, state)
return Div(Card(H3('Persistent Login Test'), P('Instructions:'), P('1. A new browser window has opened with Google'), P('2. Log in to your Google account in that window'), P('3. After logging in, close the browser window'), P('4. Return here and click the button below to confirm test completion'), P(f'Current Status: {login_status}'), Form(Button('Check Login Status', type='submit', cls='secondary'), hx_post=f'/{self.app_name}/step_04_submit', hx_target='#step_04'), Form(Button('Confirm Test Completion', type='submit', cls='primary'), hx_post=f'/{self.app_name}/step_04_confirm', hx_target='#step_04')), id='step_04')
except Exception as e:
driver.quit()
raise e
except Exception as e:
return JSONResponse(status_code=500, content={'error': str(e)})
async def step_04_confirm(self, request):
"""Handle confirmation of Persistent Login Test."""
pip, steps, app_name = (self.pipulate, self.steps, self.app_name)
step_id = 'step_04'
step_index = self.steps_indices[step_id]
step = steps[step_index]
next_step_id = steps[step_index + 1].id if step_index < len(steps) - 1 else 'finalize'
pipeline_id = pip.db.get('pipeline_id', 'unknown')
state = pip.read_state(pipeline_id)
step_data = state.get(step_id, {})
step_data[step.done] = True
step_data['persistent_session_test_confirmed'] = True
state[step_id] = step_data
pip.write_state(pipeline_id, state)
await self.message_queue.add(pip, 'Persistent login test confirmed!', verbatim=True)
return Div(pip.display_revert_header(step_id=step_id, app_name=app_name, message='Persistent Login Test', steps=steps), Div(id=next_step_id, hx_get=f'/{app_name}/{next_step_id}', hx_trigger='load'), id=step_id)
async def step_05(self, request):
"""Handles GET request for Step 5 placeholder."""
pip, steps, app_name = (self.pipulate, self.steps, self.app_name)
step_id = 'step_05'
step_index = self.steps_indices[step_id]
step = steps[step_index]
next_step_id = steps[step_index + 1].id if step_index < len(steps) - 1 else 'finalize'
pipeline_id = pip.db.get('pipeline_id', 'unknown')
state = pip.read_state(pipeline_id)
step_data = pip.get_step_data(pipeline_id, step_id, {})
placeholder_value = step_data.get(step.done, '')
finalize_data = pip.get_step_data(pipeline_id, 'finalize', {})
if 'finalized' in finalize_data and placeholder_value:
pip.append_to_history(f'[WIDGET CONTENT] {step.show} (Finalized):\n{placeholder_value}')
return Div(Card(H3(f'🔒 {step.show}: Completed')), Div(id=next_step_id, hx_get=f'/{app_name}/{next_step_id}', hx_trigger='load'), id=step_id)
if placeholder_value and state.get('_revert_target') != step_id:
pip.append_to_history(f'[WIDGET CONTENT] {step.show} (Completed):\n{placeholder_value}')
return Div(pip.display_revert_header(step_id=step_id, app_name=app_name, message=f'{step.show}: Complete', steps=steps), Div(id=next_step_id, hx_get=f'/{app_name}/{next_step_id}', hx_trigger='load'), id=step_id)
else:
pip.append_to_history(f'[WIDGET STATE] {step.show}: Showing input form')
await self.message_queue.add(pip, self.step_messages[step_id]['input'], verbatim=True)
return Div(Card(H3(f'{step.show}'), P('This is a placeholder step. Click Proceed to continue to the next step.'), Form(Button('Next ▸', type='submit', cls='primary'), hx_post=f'/{app_name}/{step_id}_submit', hx_target=f'#{step_id}')), Div(id=next_step_id), id=step_id)
async def step_05_submit(self, request):
"""Process the submission for Step 5 placeholder."""
pip, steps, app_name = (self.pipulate, self.steps, self.app_name)
step_id = 'step_05'
step_index = self.steps_indices[step_id]
step = steps[step_index]
next_step_id = steps[step_index + 1].id if step_index < len(steps) - 1 else 'finalize'
pipeline_id = pip.db.get('pipeline_id', 'unknown')
placeholder_value = 'completed'
await pip.set_step_data(pipeline_id, step_id, placeholder_value, steps)
pip.append_to_history(f'[WIDGET CONTENT] {step.show}:\n{placeholder_value}')
pip.append_to_history(f'[WIDGET STATE] {step.show}: Step completed')
await self.message_queue.add(pip, f'{step.show} complete.', verbatim=True)
return Div(pip.display_revert_header(step_id=step_id, app_name=app_name, message=f'{step.show}: Complete', steps=steps), Div(id=next_step_id, hx_get=f'/{app_name}/{next_step_id}', hx_trigger='load'), id=step_id)
These both just work so much better and more cleanly than this tool file I now
have that has all that outdented string parsing. Let me package up all the
necessary bits and do some Prompt Fu in a brand new and unconfused session.
Both I’m confused and the AI’s confused, ahaha! But the thing is now I’m using
the tool-calling architecture at least and I can transpose the methodology here
from the cleaner approach over there and do further bull in a china shop style
wiping out what’s there and transposing stuff from here. Okay that’s a plan. To
draw the full context picture, I need to show what I’m doing over there in
JupyterLab, so the the secretsauce.py
module and the 2 .ipynb
workflow
notebooks which will now be converted on the fly into .py
files when they go
into the Prompt Fu concatenated stack.
And then I need the way the magic wand works which covers most things from the
from pipulate import pip
command in the Notebooks.
/home/mike/repos/pipulate/pipulate/__init__.py
/home/mike/repos/pipulate/pipulate/core.py
/home/mike/repos/pipulate/pipulate/pipulate.py
And then there’s the way the tool-calling system works with at least one good
example with an auto-registering tool, insight as to how the tool orchestrator
works and the cli.py
environment and langauge control. And then there’s this
article itself with really what I want to ask this formidable machine
intelligence to do when it suddenly wakes up into this and sees it all.
Hi there, Gemini! What you see trying to occur per:
/home/mike/repos/pipulate/Notebooks/Client_Work/faquilizer2/workflow_test.ipynb
…in terms of browser automation actually works using the mess of things that I made in:
/home/mike/repos/pipulate/tools/scraper_tools.py
…in an excersise in shooting myself in the foot copying a complete session
hijacker I built that scraper_tools.py
was extracted from as a simpler
version, believe it or not. I know, this is a case of captain dumbass especially
considering the much more stable and svelte versions of Selenium-based browser
automation that I have in:
/home/mike/repos/pipulate/apps/820_selenium.py
/home/mike/repos/pipulate/apps/440_browser_automation.py
…both of which work great and should have been my starting-point which would have saved me much grief in a case of “it doesn’t need to be that complicated.”
So review what I have and get the gist of what I’m doing, which is controlling a simple scrape of a page. There’s more to it including where and what files get generated and saved to, but you don’t have to figure it all out in one pass. We are setting the clear priority here of a radical simplification of:
/home/mike/repos/pipulate/tools/scraper_tools.py
…using what’s found in:
/home/mike/repos/pipulate/apps/820_selenium.py
/home/mike/repos/pipulate/apps/440_browser_automation.py
…and then we can work on the polish in iterative passes. This is a request for
an implementation plan, please and thank you! I generally don’t want the entire
solutions generatively because of how much creative drift and smoothing out
there might be but in this case I’m open to the wholesale swapping out of
scraper_tools.py
with something you create. We can rapidly test and iterate in
the Notebook. So while you’re updating the scraper tools file, you’re making
sure that when it gets called from the workflow notebook via the secretsauce
module the thing it hits is what you’re being called upon for now. And I know
all about resetting the kernel to reload the module in the notebook REPL
environment. Please and thank you!
[mike@nixos:~/repos/pipulate]$ python prompt_foo.py
--- Processing Files ---
-> Converting notebook: /home/mike/repos/pipulate/Notebooks/Client_Work/faquilizer2/workflow.ipynb
-> Converting notebook: /home/mike/repos/pipulate/Notebooks/Client_Work/faquilizer2/workflow_test.ipynb
--- Generating Auto-Context ---
Generating codebase tree diagram... (2,147 tokens)
Python file(s) detected. Generating UML diagrams...
-> Generating for /home/mike/repos/pipulate/apps/820_selenium.py... (263 tokens)
-> Generating for /home/mike/repos/pipulate/apps/440_browser_automation.py... (401 tokens)
-> Generating for /home/mike/repos/pipulate/Notebooks/Client_Work/faquilizer2/secretsauce.py... (skipped)
-> Generating for /home/mike/repos/pipulate/pipulate/__init__.py... (skipped)
-> Generating for /home/mike/repos/pipulate/pipulate/core.py... (2,115 tokens)
-> Generating for /home/mike/repos/pipulate/pipulate/pipulate.py... (skipped)
-> Generating for /home/mike/repos/pipulate/cli.py... (skipped)
-> Generating for /home/mike/repos/pipulate/tools/__init__.py... (skipped)
-> Generating for /home/mike/repos/pipulate/tools/system_tools.py... (skipped)
-> Generating for /home/mike/repos/pipulate/imports/stream_orchestrator.py... (skipped)
...UML generation complete.
--- Files Included ---
• /home/mike/repos/pipulate/apps/820_selenium.py (3,428 tokens)
• /home/mike/repos/pipulate/apps/440_browser_automation.py (10,285 tokens)
• /home/mike/repos/pipulate/Notebooks/Client_Work/faquilizer2/secretsauce.py (2,275 tokens)
• /home/mike/repos/pipulate/Notebooks/Client_Work/faquilizer2/workflow.ipynb (726 tokens)
• /home/mike/repos/pipulate/Notebooks/Client_Work/faquilizer2/workflow_test.ipynb (785 tokens)
• /home/mike/repos/pipulate/pipulate/__init__.py (129 tokens)
• /home/mike/repos/pipulate/pipulate/core.py (18,834 tokens)
• /home/mike/repos/pipulate/pipulate/pipulate.py (467 tokens)
• /home/mike/repos/pipulate/cli.py (5,092 tokens)
• /home/mike/repos/pipulate/tools/__init__.py (464 tokens)
• /home/mike/repos/pipulate/tools/system_tools.py (707 tokens)
• /home/mike/repos/pipulate/imports/stream_orchestrator.py (1,163 tokens)
--- Auto-Context Included ---
• Codebase Structure (eza --tree) (2,147 tokens)
• UML Class Diagram (ASCII for /home/mike/repos/pipulate/apps/820_selenium.py) (263 tokens)
• UML Class Diagram (ASCII for /home/mike/repos/pipulate/apps/440_browser_automation.py) (401 tokens)
• UML Class Diagram (ASCII for /home/mike/repos/pipulate/pipulate/core.py) (2,115 tokens)
--- Prompt Summary ---
Total Tokens: 64,489
Total Words: 24,395
--- Size Perspective ---
📚 Equivalent in length to a **Novella or a Master's Dissertation** (Note: With a token/word ratio of 2.64, this content is far denser and more complex than typical prose of this length).
Markdown output copied to clipboard
[mike@nixos:~/repos/pipulate]$
Gemini Thinks:
Considering Refactoring Strategies
I’m currently thinking about how to approach the refactoring of scraper_tools.py
. The aim is to clarify its browser automation logic. I’m focusing on breaking down the existing functions into smaller, more manageable units. I’m also considering introducing classes to better encapsulate related functionality, making the code more modular and easier to test.
Analyzing Replacement Logic
I’ve been examining the simpler browser automation logic in 820_selenium.py
and 440_browser_automation.py
to identify transferable patterns. My focus is on how to integrate the cleaner code into the existing scraper_tools.py
structure, ensuring compatibility with the call chain from the Jupyter Notebook. I’m exploring the feasibility of replacing the complex session hijacker-derived logic. The aim is to create a much simpler, more reliable browser automation solution.
Developing Replacement Strategy
I’m now focusing on a more concrete strategy for replacing the existing scraper_tools.py
. My approach involves crafting a new version that leverages the simplicity and robustness of the code in 820_selenium.py
and 440_browser_automation.py
. I aim to design the replacement with smaller, modular functions that are easier to test. I’m prioritizing compatibility with the existing call chain from the Jupyter Notebook while ensuring the solution is significantly cleaner.
Implementing Clean Browser Automation
I’m now fully immersed in the replacement strategy for scraper_tools.py
. My primary focus is on developing a new version that leverages the clarity and stability of the code in 820_selenium.py
and 440_browser_automation.py
. I aim to structure the replacement with smaller, modular functions that are easily testable, prioritizing compatibility with the existing call chain from the Jupyter Notebook. The goal is a substantially cleaner and more reliable browser automation solution.
Designing the Refactored Scraper
Context verified. I’m now outlining the steps for building the new scraper_tools.py
replacement. I’ll start by analyzing what pipulate.core
expects and then design an API that meets these needs. I’ll use the cleaner code examples to create simpler Selenium interactions. I’m prioritizing modularity and testability within the existing call structure. Finally, I’ll aim for a straightforward implementation plan for a smooth transition.
Crafting a Replacement
I’m now fully immersed in developing the new scraper_tools.py
replacement, focusing on the core Selenium automation. I’ve sketched the structure and determined the necessary input arguments and the expected return value. Using the cleaner code examples, I’m building a streamlined, modular Selenium interaction tailored for compatibility. The key goal is to ensure a reliable and straightforward browser automation solution within the existing call structure.
Implementing Modular Automation
I’m now focused on the replacement plan for scraper_tools.py
. I’ll start by analyzing what pipulate.core.py
expects from the current Selenium interaction. Then, I’ll design an API that meets these needs. I plan to use the cleaner code examples to create modular Selenium functions that integrate smoothly within the existing call structure. I’ll prioritize a straightforward implementation plan for a smooth transition.
Developing Modular Automation
I’ve outlined a replacement plan for scraper_tools.py
, focusing on ensuring compatibility and cleaner Selenium logic. I’ll begin by assessing what pipulate.core.py
currently demands from the Selenium interaction. Then, I’ll design a new API to meet these needs, drawing inspiration from the simpler code examples for modularity. I will ensure the final solution will integrate within the existing call structure. The aim is to prioritize a straightforward implementation plan for a smooth transition.
Formulating the Implementation Strategy
I’ve been analyzing the expected inputs and outputs of scraper_tools.py
in the context of the larger application. I plan to construct a more modular and robust replacement using clean examples for Selenium automation logic, ensuring compatibility with the existing call chain from the Jupyter Notebook and prioritizing a clear and straightforward implementation strategy.
Analyzing Replacement Logic
I’ve been analyzing the expected inputs and outputs of scraper_tools.py
in the context of the larger application. I plan to construct a more modular and robust replacement using clean examples for Selenium automation logic, ensuring compatibility with the existing call chain from the Jupyter Notebook and prioritizing a clear and straightforward implementation strategy.
Gemini: Context verified.
Of course! It looks like you’ve correctly identified a path to simplify your browser automation tools by refactoring scraper_tools.py
. You’re right, the existing app files (820_selenium.py
and 440_browser_automation.py
) are excellent, battle-tested templates for this.
Let’s do a “bull in a china shop” replacement. We’ll create a new, clean scraper_tools.py
based on the best parts of your existing code. This plan focuses on creating a robust, simple, and self-contained tool that fits perfectly into your pip.scrape()
architecture.
Implementation Plan
Here is the plan to radically simplify your browser automation tool:
-
Establish the Contract: The
pip.scrape()
method inpipulate/core.py
callsselenium_automation(params)
. Theparams
dictionary containsurl
,pipeline_id
, andtake_screenshot
. The function is expected to return a dictionary containingsuccess
(boolean),error
(string, if any), andlooking_at_files
(a dictionary of artifact paths). Our new implementation will strictly adhere to this contract. -
Borrow the Best Logic: We will use
apps/440_browser_automation.py
as our primary inspiration because itsstep_02_submit
function already contains the core logic for:- Scraping a page using
selenium-wire
(which gives us access to source, DOM, and network requests like headers). - Saving artifacts (
source.html
,dom.html
,headers.json
) to a structured directory. This is exactly whatpip.scrape()
is intended to orchestrate.
- Scraping a page using
-
Construct the New
selenium_automation
Tool: We’ll create a single, focused async function,selenium_automation
, inside the newscraper_tools.py
. This function will:- Be decorated with
@auto_tool
to ensure it’s automatically registered and callable via your MCP architecture. - Use the
pipeline_id
(e.g.,scrape-mikelev-in-173005
) to create a unique output directory for each scrape, preventing collisions. This follows the pattern established inpip.scrape()
. The base path will bebrowser_automation/looking_at/
as hinted at in yoursecretsauce.py
module. - Implement screenshot logic if
take_screenshot
isTrue
. - Gracefully handle errors within a
try...except
block, returning asuccess: False
dictionary. - Return a dictionary containing paths to all generated artifacts (
dom.html
,source.html
,screenshot.png
, etc.) upon success.
- Be decorated with
-
Replace and Test: The final step is to replace the old
tools/scraper_tools.py
with the new version. Because the function signature and return contract are maintained, theworkflow_test.ipynb
notebook should work immediately after you restart the kernel, with theawait secretsauce.test_advanced_scrape(job)
cell now running the clean, new code.
Proposed New tools/scraper_tools.py
Here is the complete code for the new, simplified file. You can replace the entire contents of /home/mike/repos/pipulate/tools/scraper_tools.py
with this.
The Initial AI-Driven Solution: scraper_tools.py
Refactored
# /home/mike/repos/pipulate/tools/scraper_tools.py
import asyncio
import json
import os
from datetime import datetime
from pathlib import Path
from urllib.parse import quote, urlparse
from loguru import logger
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from seleniumwire import webdriver as wire_webdriver
from webdriver_manager.chrome import ChromeDriverManager
from tools import auto_tool
# --- Helper Functions (Borrowed from 440_browser_automation.py) ---
def get_safe_path_component(url: str) -> tuple[str, str]:
"""Converts a URL into filesystem-safe components for directory paths."""
parsed = urlparse(url)
domain = parsed.netloc
path = parsed.path
if not path or path == '/':
path_slug = "ROOT"
else:
# Quote the full path to handle special characters, then truncate for sanity
path_slug = quote(path, safe='').replace('/', '_')[:100]
return domain, path_slug
# --- The Refactored Browser Automation Tool ---
@auto_tool
async def selenium_automation(params: dict) -> dict:
"""
Performs an advanced browser automation scrape of a single URL.
This tool gives AI "eyes" by launching a headless browser to capture a rich
set of artifacts from a webpage, including the DOM, source code, headers,
and an optional screenshot. It uses a clean, temporary browser profile for
each run to ensure a consistent state.
Args:
params: A dictionary containing:
- url (str): The URL to scrape.
- pipeline_id (str): A unique ID for this job, used for the output folder name.
- take_screenshot (bool): Whether to capture a screenshot of the page.
Returns:
A dictionary containing the results of the operation, including paths
to all captured artifacts.
"""
url = params.get("url")
pipeline_id = params.get("pipeline_id", f"scrape-{datetime.now().isoformat()}")
take_screenshot = params.get("take_screenshot", False)
if not url:
return {"success": False, "error": "URL parameter is required."}
driver = None
artifacts = {}
try:
# --- 1. Set up output directory ---
domain, path_slug = get_safe_path_component(url)
# Consistent with secretsauce.py's expectation
output_dir = Path("browser_automation/looking_at/") / pipeline_id
output_dir.mkdir(parents=True, exist_ok=True)
logger.info(f"💾 Saving artifacts to: {output_dir}")
# --- 2. Configure Selenium WebDriver ---
chrome_options = Options()
chrome_options.add_argument("--headless") # Run in headless mode
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--window-size=1920,1080")
# Use webdriver-manager for cross-platform compatibility
effective_os = os.environ.get('EFFECTIVE_OS', sys.platform)
if effective_os == 'darwin':
service = Service(ChromeDriverManager().install())
else:
# Assumes chromedriver is in PATH for Linux/other environments
service = Service()
logger.info("🚀 Initializing Chrome driver with Selenium-Wire...")
driver = wire_webdriver.Chrome(service=service, options=chrome_options)
# --- 3. Scrape the Page ---
logger.info(f" navigatin to: {url}")
driver.get(url)
await asyncio.sleep(3) # Wait for JS to render
# --- 4. Capture Artifacts ---
# DOM
dom_path = output_dir / "dom.html"
dom_content = driver.execute_script("return document.documentElement.outerHTML;")
dom_path.write_text(dom_content, encoding='utf-8')
artifacts['dom'] = str(dom_path)
# Source
source_path = output_dir / "source.html"
source_path.write_text(driver.page_source, encoding='utf-8')
artifacts['source'] = str(source_path)
# Headers
main_request = next((r for r in driver.requests if r.response and r.url == url), driver.last_request)
if main_request and main_request.response:
headers_path = output_dir / "headers.json"
headers_path.write_text(json.dumps(dict(main_request.response.headers), indent=2))
artifacts['headers'] = str(headers_path)
# Screenshot
if take_screenshot:
screenshot_path = output_dir / "screenshot.png"
driver.save_screenshot(str(screenshot_path))
artifacts['screenshot'] = str(screenshot_path)
logger.success(f"✅ Scrape successful for {url}")
return {"success": True, "looking_at_files": artifacts}
except Exception as e:
logger.error(f"❌ Scrape failed for {url}: {e}")
return {"success": False, "error": str(e), "looking_at_files": artifacts}
finally:
if driver:
driver.quit()
logger.info("Browser closed.")
Me:
[mike@nixos:~/repos/pipulate/tools]$ xv scraper_tools.py
No reason to do a git diff here since it was a complete file replacement. I got
an sys not found and had to add import sys
to the imports, then immediate
success, except it’s in headless mode which actually bodes quite well for this
working in Google Colab. The files were stored relative to the running script:
/home/mike/repos/pipulate/Notebooks/Client_Work/faquilizer2/browser_automation/looking_at/scrape-mikelev-in-173548
And I looked at the files generated there:
faquilizer2/browser_automation/looking_at/scrape-mikelev-in-173548/dom.html
faquilizer2/browser_automation/looking_at/scrape-mikelev-in-173548/headers.json
faquilizer2/browser_automation/looking_at/scrape-mikelev-in-173548/screenshot.png
faquilizer2/browser_automation/looking_at/scrape-mikelev-in-173548/source.html
…and all good! So great job, Gemini!
I also want whether it’s in headless mode or not easily toggle in an idiomatic
way through the pip.scrape()
API we’re creating. The API should also support
different scrape modes, like we’ll likely have an alternative requests/httpx way
of doing it for when a browser doesn’t have to be in the picture, but we want to
default it to this Selenium mode.
--- 🧪 Starting Advanced Scrape Test Flight ---
-> Target: https://mikelev.in/
/home/mike/repos/pipulate/.venv/lib/python3.12/site-packages/seleniumwire/thirdparty/mitmproxy/contrib/kaitaistruct/tls_client_hello.py:10: UserWarning: pkg_resources is deprecated as an API. See https://setuptools.pypa.io/en/latest/pkg_resources.html. The pkg_resources package is slated for removal as early as 2025-11-30. Refrain from using this package or pin to Setuptools<81.
from pkg_resources import parse_version
-> ✅ Success! Advanced scrape complete.
-> Artifacts captured in 'browser_automation/looking_at/':
- dom: browser_automation/looking_at/scrape-mikelev-in-173548/dom.html
- source: browser_automation/looking_at/scrape-mikelev-in-173548/source.html
- headers: browser_automation/looking_at/scrape-mikelev-in-173548/headers.json
- screenshot: browser_automation/looking_at/scrape-mikelev-in-173548/screenshot.png
--- 🧪 Test Flight Complete ---
Again, great job! If the next round of revisions has to touch pipulate/core.py
we don’t do that sort of stuff generatively except in very small patches. Here’s
how that works (ai_edit.py
):
# ai_edit.py
'''
# 🔧 AIRTIGHT AGENTIC PROMPT CONTRACT TEMPLATE v2.1
═══════════════════════════════════════════════════════════════════════════════════════════════
Hello Gemini.
You are waking up into the Pipulate codebase. Your mission is to .
You are a world-class AI Coding Assistant. My code is your world. My philosophy is your guide.
## Our Philosophy: Determinism Over Generation
The core principle of this project is **stability and predictability**. Your primary value is in executing well-defined plans with surgical precision. Generative "YOLO" edits on existing code are strictly forbidden as they introduce chaos and risk regression. Your goal is to be a master surgeon, not a mad scientist.
## Prime Directives: The Rules of this Workshop
1. **Orient Yourself:** Before *any* action, run `git status` and `git diff` to understand the current state. Compare this against the plan to know exactly what to do next.
2. **Deterministic Editing Protocol:** You have a hierarchy of tools for modifying code. You **MUST** use them in this order of preference:
* **🥇 GOLD STANDARD (for planned changes): `ai_edit.py`**
* This is your primary tool for surgical, multi-line code replacement.
* **Your Task:** Generate a Python patch file (e.g., `patch.py`) that defines a list of dictionaries named `patches`.
* **Patch File Format:** Each dictionary must have "file", "block\_name", and "new\_code" keys. Use triple-quoted f-strings for the `new_code` value for maximum readability and flexibility.
```python
# patch.py
patches = [
{
"file": "path/to/your_file.py",
"block_name": "sentinel_name_in_file",
"new_code": """
# Your new, properly indented code goes here.
# Use triple quotes for clean, multi-line strings.
new_function_call()
"""
}
]
```
* **Execution:** Run `.venv/bin/python ai_edit.py patch.py` to apply the changes.
* This is the **ONLY** approved method for complex refactoring.
* **🥈 SILVER STANDARD (for simple, safe changes): `sed`**
* Use `sed -i 's/old/new/g' path/to/file.py` for trivial, single-line, unambiguous replacements ONLY.
* Always verify with `git diff` immediately after.
* **🚫 FORBIDDEN:** You **MUST NOT** perform generative edits directly on existing source code files. Your generative capabilities should only be used to create the content for the `new_code` blocks within your Python patch file.
3. **One Small Step:** Execute only one atomic change at a time. After each successful commit, re-orient yourself before proceeding to the next step.
4. **Verify or Revert:**
* After every file modification, run `git diff` to confirm the change was correct.
* Watchdog is used to live-reload `server.py` on every edit so attempts to run the server should only be made to check for syntax errors.
* Run `.venv/bin/python server.py` to check for syntax errors and ensure the server can start. If it fails, capture the error.
* **If Successful:** `git add .` and `git commit` with a clear message.
* **If It Fails:** You **MUST IMMEDIATELY** run `git reset --hard HEAD`, append the captured error to the "Critical Failure Analysis" section, and terminate the session.
5. **Nix Environment:** You are in a `nix develop` shell. **NEVER** `pip install` anything. Use `.venv/bin/python` for all Python scripts.
## Current State and Critical Failure Analysis
* **Branch:** You are on the git branch: ``.
* **Last Known State:**
* **Critical Failure Analysis:**
## The Implementation Plan
## Completion Protocol (Definition of Done)
You are **DONE** when all steps in the plan are committed and `git status` is clean. Announce completion, show the `git log`, and terminate.
Your first action is to **orient yourself**. Begin now.
'''
import ast
import argparse
from pathlib import Path
import sys
import importlib.util
class CodeRefactorer:
"""
Performs robust, deterministic code block replacements in Python files
using sentinel comments and AST validation.
"""
def __init__(self, file_path: str | Path):
self.file_path = Path(file_path)
if not self.file_path.exists():
raise FileNotFoundError(f"File not found: {self.file_path}")
self._original_content = self.file_path.read_text()
self._new_content = self._original_content
def _verify_syntax(self, code_to_check: str, block_name: str):
try:
ast.parse(code_to_check)
print(f" ✅ AST validation successful for block '{block_name}'.")
except SyntaxError as e:
print(f" ❌ AST validation FAILED for block '{block_name}'. The proposed change would break the file.")
error_line = code_to_check.splitlines()[e.lineno - 1]
print(f" Error near line {e.lineno}: {error_line.strip()}")
print(f" {' ' * (e.offset - 1)}^")
print(f" Reason: {e.msg}")
raise e
def replace_block(self, block_name: str, new_code: str):
start_sentinel = f"# START: {block_name}"
end_sentinel = f"# END: {block_name}"
try:
before_block, rest = self._new_content.split(start_sentinel, 1)
old_block, after_block = rest.split(end_sentinel, 1)
# Use textwrap.dedent to handle triple-quoted string indentation
import textwrap
new_code = textwrap.dedent(new_code).strip()
base_indentation = before_block.split('\n')[-1]
indented_new_code = "\n".join(
f"{base_indentation}{line}" for line in new_code.split('\n')
)
content_with_replacement = (
f"{before_block}{start_sentinel}\n"
f"{indented_new_code}\n"
f"{base_indentation}{end_sentinel}{after_block}"
)
self._verify_syntax(content_with_replacement, block_name)
self._new_content = content_with_replacement
print(f" ✅ Block '{block_name}' in {self.file_path.name} is ready to be replaced.")
except ValueError:
print(f" ⚠️ Could not find sentinels for block '{block_name}' in {self.file_path.name}. Skipping.")
except Exception as e:
print(f" ❌ An error occurred while replacing block '{block_name}': {e}")
raise
def write_changes(self):
if self._new_content != self._original_content:
print(f"Writing changes to {self.file_path}...")
self.file_path.write_text(self._new_content)
print(" 💾 File saved successfully.")
else:
print(f"🤷 No changes were made to {self.file_path}.")
def load_patches_from_module(patch_module_path: Path):
"""Dynamically imports a Python module and returns its 'patches' list."""
try:
module_name = patch_module_path.stem
spec = importlib.util.spec_from_file_location(module_name, patch_module_path)
patch_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(patch_module)
return getattr(patch_module, 'patches')
except AttributeError:
print(f"Error: The patch file '{patch_module_path}' must define a list named 'patches'.")
sys.exit(1)
except Exception as e:
print(f"Error loading patch module '{patch_module_path}': {e}")
sys.exit(1)
def main():
parser = argparse.ArgumentParser(description="Apply deterministic, AST-validated code patches from a Python module.")
parser.add_argument("patch_file", help="Path to the Python file containing the 'patches' list.")
args = parser.parse_args()
patch_file_path = Path(args.patch_file)
if not patch_file_path.exists():
print(f"Error: Patch file not found at '{patch_file_path}'")
sys.exit(1)
patches = load_patches_from_module(patch_file_path)
print(f"Applying patches from: {patch_file_path.name}")
print("-" * 30)
patches_by_file = {}
for patch in patches:
file = patch.get("file")
if file not in patches_by_file:
patches_by_file[file] = []
patches_by_file[file].append(patch)
for file_path_str, file_patches in patches_by_file.items():
print(f"\nProcessing file: {file_path_str}")
try:
refactorer = CodeRefactorer(file_path_str)
for patch in file_patches:
block_name = patch.get("block_name")
new_code = patch.get("new_code")
if not block_name or new_code is None:
print(f" ⚠️ Skipping invalid patch item: {patch}")
continue
refactorer.replace_block(block_name, new_code)
refactorer.write_changes()
except (FileNotFoundError, SyntaxError) as e:
print(f"\nProcess aborted for {file_path_str} due to a critical error: {e}")
print("No changes have been written to this file.")
except Exception as e:
print(f"\nAn unexpected error occurred for {file_path_str}: {e}")
print("\n" + "-" * 30)
print("Refactoring process complete. Please review the changes with 'git diff'.")
if __name__ == "__main__":
main()
Gemini: Excellent! It’s great to hear the refactor was a success and that headless mode is working as expected. That’s a solid foundation to build on.
You’re right, now is the perfect time to make the API for pip.scrape()
more robust and idiomatic. Adding toggles for headless mode and setting up for different scrape modes is the logical next step. I fully understand the ai_edit.py
protocol for deterministic changes, and we will use that for modifying pipulate/core.py
.
Here is the implementation plan to enhance the pip.scrape()
API.
Implementation Plan
Step 1: Add Sentinels (Manual Prerequisite)
To perform a surgical update, we first need to define the operation area. Please add these sentinel comments to the two files we’ll be modifying. This allows the ai_edit.py
script to replace only the specific functions we’re targeting.
-
In
/home/mike/repos/pipulate/tools/scraper_tools.py
, wrap theselenium_automation
function like this:# START: selenium_automation @auto_tool async def selenium_automation(params: dict) -> dict: # ... existing function code ... # END: selenium_automation
-
In
/home/mike/repos/pipulate/pipulate/core.py
, wrap thescrape
method like this:# START: scrape_method async def scrape(self, url: str, take_screenshot: bool = False, **kwargs): # ... existing function code ... # END: scrape_method
Step 2: Generate the Patch File
Next, create a new file named patch.py
in the root of your pipulate
repository and paste the following code into it. This file contains the updated logic for both functions.
- The
selenium_automation
function will now check for aheadless
parameter. - The
scrape
method inPipulate
will now acceptmode
andheadless
arguments, making the API cleaner and ready for future expansion.
# patch.py
patches = [
{
"file": "/home/mike/repos/pipulate/tools/scraper_tools.py",
"block_name": "selenium_automation",
"new_code": """
@auto_tool
async def selenium_automation(params: dict) -> dict:
\"\"\"
Performs an advanced browser automation scrape of a single URL.
This tool gives AI "eyes" by launching a browser to capture a rich
set of artifacts from a webpage, including the DOM, source code, headers,
and an optional screenshot. It uses a clean, temporary browser profile for
each run to ensure a consistent state.
Args:
params: A dictionary containing:
- url (str): The URL to scrape.
- pipeline_id (str): A unique ID for this job, used for the output folder name.
- take_screenshot (bool): Whether to capture a screenshot of the page.
- headless (bool): Whether to run the browser in headless mode. Defaults to True.
Returns:
A dictionary containing the results of the operation, including paths
to all captured artifacts.
\"\"\"
url = params.get("url")
pipeline_id = params.get("pipeline_id", f"scrape-{datetime.now().isoformat()}")
take_screenshot = params.get("take_screenshot", False)
headless = params.get("headless", True) # Default to headless mode
if not url:
return {"success": False, "error": "URL parameter is required."}
driver = None
artifacts = {}
try:
# --- 1. Set up output directory ---
domain, path_slug = get_safe_path_component(url)
output_dir = Path("browser_automation/looking_at/") / pipeline_id
output_dir.mkdir(parents=True, exist_ok=True)
logger.info(f"💾 Saving artifacts to: {output_dir}")
# --- 2. Configure Selenium WebDriver ---
chrome_options = Options()
if headless:
chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--window-size=1920,1080")
# Use webdriver-manager for cross-platform compatibility
effective_os = os.environ.get('EFFECTIVE_OS', sys.platform)
if effective_os == 'darwin':
service = Service(ChromeDriverManager().install())
else:
service = Service() # Assumes chromedriver is in PATH
logger.info(f"🚀 Initializing Chrome driver (Headless: {headless})...")
driver = wire_webdriver.Chrome(service=service, options=chrome_options)
# --- 3. Scrape the Page ---
logger.info(f" navigating to: {url}")
driver.get(url)
await asyncio.sleep(3) # Wait for JS to render
# --- 4. Capture Artifacts ---
dom_path = output_dir / "dom.html"
dom_content = driver.execute_script("return document.documentElement.outerHTML;")
dom_path.write_text(dom_content, encoding='utf-8')
artifacts['dom'] = str(dom_path)
source_path = output_dir / "source.html"
source_path.write_text(driver.page_source, encoding='utf-8')
artifacts['source'] = str(source_path)
main_request = next((r for r in driver.requests if r.response and r.url == url), driver.last_request)
if main_request and main_request.response:
headers_path = output_dir / "headers.json"
headers_path.write_text(json.dumps(dict(main_request.response.headers), indent=2))
artifacts['headers'] = str(headers_path)
if take_screenshot:
screenshot_path = output_dir / "screenshot.png"
driver.save_screenshot(str(screenshot_path))
artifacts['screenshot'] = str(screenshot_path)
logger.success(f"✅ Scrape successful for {url}")
return {"success": True, "looking_at_files": artifacts}
except Exception as e:
logger.error(f"❌ Scrape failed for {url}: {e}")
return {"success": False, "error": str(e), "looking_at_files": artifacts}
finally:
if driver:
driver.quit()
logger.info("Browser closed.")
"""
},
{
"file": "/home/mike/repos/pipulate/pipulate/core.py",
"block_name": "scrape_method",
"new_code": """
async def scrape(self, url: str, take_screenshot: bool = False, mode: str = 'selenium', headless: bool = True, **kwargs):
\"\"\"
Gives AI "eyes" by performing browser automation or HTTP requests to scrape a URL.
This method is the primary entrypoint for scraping and supports multiple modes.
The default mode is 'selenium' which uses a full browser.
Args:
url (str): The URL to scrape.
take_screenshot (bool): Whether to capture a screenshot (selenium mode only). Defaults to False.
mode (str): The scraping mode to use ('selenium', 'requests', etc.). Defaults to 'selenium'.
headless (bool): Whether to run the browser in headless mode (selenium mode only). Defaults to True.
**kwargs: Additional parameters to pass to the underlying automation tool.
Returns:
dict: The result from the scraper tool, including paths to captured artifacts.
\"\"\"
from tools.scraper_tools import selenium_automation
from urllib.parse import urlparse
from datetime import datetime
logger.info(f"👁️🗨️ Initiating scrape for: {url} (Mode: {mode}, Headless: {headless})")
# Create a transient, descriptive pipeline_id for this one-off scrape.
domain = urlparse(url).netloc
timestamp = datetime.now().strftime('%H%M%S')
scrape_pipeline_id = f"scrape-{domain.replace('.', '-')}-{timestamp}"
params = {
"url": url,
"pipeline_id": scrape_pipeline_id,
"take_screenshot": take_screenshot,
"headless": headless,
**kwargs # Pass through any other params
}
if mode == 'selenium':
try:
result = await selenium_automation(params)
return result
except Exception as e:
logger.error(f"❌ Advanced scrape failed for {url}: {e}")
return {"success": False, "error": str(e)}
else:
logger.warning(f"Scrape mode '{mode}' is not yet implemented.")
return {"success": False, "error": f"Mode '{mode}' not implemented."}
"""
}
]
Step 3: Apply the Patch
Now, run the ai_edit.py
script from your repository root to apply these changes deterministically:
.venv/bin/python ai_edit.py patch.py
After the script runs, you can review the changes with git diff
and then you’ll be ready to test the new API from your Jupyter Notebook. You can now call pip.scrape(url, headless=False)
to see the browser window pop up!
Me: Alright, I’m going to track patches in the git repo. It’ll add a little
bit of noise, but it will also tell a story. I did the same thing with
foo_files.py
as part of the prompt_foo.py
system that I use to make these
concatenated prompt payloads, but I haven’t been able to bring myself to do it
with the prompt.md
file itself (what this article becomes) because I think
that would be too noisy in the git repo. Anyway, here’s the staging of adding
the patch and inserting the comment sentinels:
[mike@nixos:~/repos/pipulate]$ git --no-pager diff
diff --git a/pipulate/core.py b/pipulate/core.py
index f8f1fcf1..6d922106 100644
--- a/pipulate/core.py
+++ b/pipulate/core.py
@@ -1836,7 +1836,6 @@ class Pipulate:
logger.error(f"🔍 DEBUG: Unexpected error in process_llm_interaction: {e}")
yield error_msg
- # START: notebook_api_methods
def read(self, job: str) -> dict:
"""Reads the entire state dictionary for a given job (pipeline_id)."""
state = self.read_state(job)
@@ -1873,7 +1872,8 @@ class Pipulate:
"""Gets a value for a key within a job's state."""
state = self.read_state(job)
return state.get(step, default)
-
+
+ # START: scrape_method
async def scrape(self, url: str, take_screenshot: bool = False, **kwargs):
"""
Gives AI "eyes" by performing advanced browser automation to scrape a URL.
@@ -1918,4 +1918,4 @@ class Pipulate:
except Exception as e:
logger.error(f"❌ Advanced scrape failed for {url}: {e}")
return {"success": False, "error": str(e)}
- # END: notebook_api_methods
+ # END: scrape_method
[mike@nixos:~/repos/pipulate]$ git status
On branch main
Your branch is up to date with 'origin/main'.
Changes to be committed:
(use "git restore --staged <file>..." to unstage)
new file: patch.py
Changes not staged for commit:
(use "git add <file>..." to update what will be committed)
(use "git restore <file>..." to discard changes in working directory)
modified: pipulate/core.py
[mike@nixos:~/repos/pipulate]$
And now I apply the patch.
[mike@nixos:~/repos/pipulate]$ python ai_edit.py patch.py
/home/mike/repos/pipulate/ai_edit.py:27: SyntaxWarning: invalid escape sequence '\_'
* **Patch File Format:** Each dictionary must have "file", "block\_name", and "new\_code" keys. Use triple-quoted f-strings for the `new_code` value for maximum readability and flexibility.
Applying patches from: patch.py
------------------------------
Processing file: /home/mike/repos/pipulate/tools/scraper_tools.py
⚠️ Could not find sentinels for block 'selenium_automation' in scraper_tools.py. Skipping.
🤷 No changes were made to /home/mike/repos/pipulate/tools/scraper_tools.py.
Processing file: /home/mike/repos/pipulate/pipulate/core.py
✅ AST validation successful for block 'scrape_method'.
✅ Block 'scrape_method' in core.py is ready to be replaced.
Writing changes to /home/mike/repos/pipulate/pipulate/core.py...
💾 File saved successfully.
------------------------------
Refactoring process complete. Please review the changes with 'git diff'.
[mike@nixos:~/repos/pipulate]$
…and it applies getting through the AST guardrails. Here’s the git diff:
[mike@nixos:~/repos/pipulate]$ git --no-pager diff
diff --git a/pipulate/core.py b/pipulate/core.py
index f8f1fcf1..6db662a2 100644
--- a/pipulate/core.py
+++ b/pipulate/core.py
@@ -1836,7 +1836,6 @@ class Pipulate:
logger.error(f"🔍 DEBUG: Unexpected error in process_llm_interaction: {e}")
yield error_msg
- # START: notebook_api_methods
def read(self, job: str) -> dict:
"""Reads the entire state dictionary for a given job (pipeline_id)."""
state = self.read_state(job)
@@ -1873,31 +1872,32 @@ class Pipulate:
"""Gets a value for a key within a job's state."""
state = self.read_state(job)
return state.get(step, default)
-
- async def scrape(self, url: str, take_screenshot: bool = False, **kwargs):
+
+ # START: scrape_method
+ async def scrape(self, url: str, take_screenshot: bool = False, mode: str = 'selenium', headless: bool = True, **kwargs):
"""
- Gives AI "eyes" by performing advanced browser automation to scrape a URL.
+ Gives AI "eyes" by performing browser automation or HTTP requests to scrape a URL.
- This method acts as a simplified bridge to the powerful browser automation
- tools, allowing for direct, on-demand scraping from notebooks or other clients.
+ This method is the primary entrypoint for scraping and supports multiple modes.
+ The default mode is 'selenium' which uses a full browser.
Args:
url (str): The URL to scrape.
- take_screenshot (bool): Whether to capture a screenshot of the page.
+ take_screenshot (bool): Whether to capture a screenshot (selenium mode only). Defaults to False.
+ mode (str): The scraping mode to use ('selenium', 'requests', etc.). Defaults to 'selenium'.
+ headless (bool): Whether to run the browser in headless mode (selenium mode only). Defaults to True.
**kwargs: Additional parameters to pass to the underlying automation tool.
Returns:
- dict: The result from the browser automation tool, typically including
- paths to captured artifacts like DOM, source, and screenshot.
+ dict: The result from the scraper tool, including paths to captured artifacts.
"""
from tools.scraper_tools import selenium_automation
from urllib.parse import urlparse
from datetime import datetime
- logger.info(f"👁️🗨️ Initiating advanced scrape for: {url}")
+ logger.info(f"👁️🗨️ Initiating scrape for: {url} (Mode: {mode}, Headless: {headless})")
# Create a transient, descriptive pipeline_id for this one-off scrape.
- # This allows us to use the workflow hijacking tool for a simple scrape.
domain = urlparse(url).netloc
timestamp = datetime.now().strftime('%H%M%S')
scrape_pipeline_id = f"scrape-{domain.replace('.', '-')}-{timestamp}"
@@ -1906,16 +1906,18 @@ class Pipulate:
"url": url,
"pipeline_id": scrape_pipeline_id,
"take_screenshot": take_screenshot,
- **kwargs # Pass through any other params
+ "headless": headless,
+ **kwargs # Pass through any other params
}
- try:
- # We call the 'workflow_hijack' tool, but in this context, it's just
- # navigating and capturing artifacts. We bypass the form-filling parts
- # by providing a unique, non-existent pipeline_id.
- result = await selenium_automation(params)
- return result
- except Exception as e:
- logger.error(f"❌ Advanced scrape failed for {url}: {e}")
- return {"success": False, "error": str(e)}
- # END: notebook_api_methods
+ if mode == 'selenium':
+ try:
+ result = await selenium_automation(params)
+ return result
+ except Exception as e:
+ logger.error(f"❌ Advanced scrape failed for {url}: {e}")
+ return {"success": False, "error": str(e)}
+ else:
+ logger.warning(f"Scrape mode '{mode}' is not yet implemented.")
+ return {"success": False, "error": f"Mode '{mode}' not implemented."}
+ # END: scrape_method
[mike@nixos:~/repos/pipulate]$
And now I test it again in the Jupyter Notebook in JupyterLab.
It works but it’s still in headless mode.
Also this pipeline ID it uses to name the folders is way too much:
[mike@nixos:~/repos/pipulate/Notebooks/Client_Work/faquilizer2/browser_automation/looking_at]$ ls -la
total 32
drwxr-xr-x 8 mike users 4096 Oct 10 18:07 .
drwxr-xr-x 3 mike users 4096 Oct 9 17:38 ..
drwxr-xr-x 2 mike users 4096 Oct 10 17:35 scrape-mikelev-in-173500
drwxr-xr-x 2 mike users 4096 Oct 10 18:06 scrape-mikelev-in-173548
drwxr-xr-x 2 mike users 4096 Oct 10 18:04 scrape-mikelev-in-180412
drwxr-xr-x 2 mike users 4096 Oct 10 18:05 scrape-mikelev-in-180526
drwxr-xr-x 2 mike users 4096 Oct 10 18:06 scrape-mikelev-in-180637
drwxr-xr-x 2 mike users 4096 Oct 10 18:07 scrape-mikelev-in-180701
[mike@nixos:~/repos/pipulate/Notebooks/Client_Work/faquilizer2/browser_automation/looking_at]$ rm -rf *
[mike@nixos:~/repos/pipulate/Notebooks/Client_Work/faquilizer2/browser_automation/looking_at]$
I don’t even know where to cd into to look. Make the folder-name just the name
of the domain so in this case mikelev.in
. No reason to convert dots to hyphens
since filesystems support the dots and hyphens are legitimate in domains so that
would cause ambiguity. Don’t worry about the protocol. We will always assume
https and meta data elsewhere can confirm the full URLs.
Inside the domain folder each page fetched should have its fully URL encoded
path used for the folder-names. So the “slash” homepage directory would become a
%2F
folder, and so on. The only URL on the site not supported by this is the
naked domain on sites that don’t do the conventional Apache-style forwarding to
the /
path and for those, we will also use %2F
. So yes, every other path on
the site begins with %2F
and it can get a bit verbose but this ensures that
the filesystem within a domain directory works as a 100% deterministic
key/value-pair API for pulling up pages from scraped sites.
For now don’t worry about dates and IDs. We don’t need any unique identifiers in the system right now besides those derived from the URL.
I already tried putting it into headless mode by changing the function signature default:
async def scrape(self, url: str, take_screenshot: bool = False, mode: str = 'selenium', headless: bool = False, **kwargs):
…and that didn’t work and we have to make sure headless turned off so we can see the browser pop up works! It’s such a major point here and I know the AIs almost never want to allow that to occur but it is a must here. So that’s our set of priorities for the next revision, please and thank you! The sentinel markers are already in place so we can iterate rapidly, please and thank you!
Gemini: You’ve hit on some excellent points! My apologies, it seems I missed a step in my previous logic. Let’s get this fixed.
You’re absolutely right about a few things:
- Patch Failure: The patch for
scraper_tools.py
failed because I asked you to add the sentinels, but my generated patch file depended on them before they were committed. My mistake. We’ll get that sorted. - Headless Mode: You correctly diagnosed that changing the default in
pip.scrape()
isn’t enough. The function calling it also needs to pass the parameter. To make testing obvious, we’ll update the call in your notebook to explicitly requestheadless=False
. - Folder Naming: Your proposed folder structure (
domain/%2Fpath%2Fslug
) is much cleaner, more deterministic, and avoids ambiguity. It’s a great improvement.
Let’s implement these changes. Since the sentinels are now in place in core.py
, we can iterate quickly.
Implementation Plan
Step 1: Add Sentinels (Manual Prerequisite)
Please add the following sentinel comments. This ensures our patch is precise and only touches the code we intend to change.
-
In
/home/mike/repos/pipulate/tools/scraper_tools.py
(The one that was missed last time):# START: selenium_automation @auto_tool async def selenium_automation(params: dict) -> dict: # ... existing function code ... # END: selenium_automation
-
In
/home/mike/repos/pipulate/Notebooks/Client_Work/faquilizer2/secretsauce.py
:# START: test_advanced_scrape async def test_advanced_scrape(job: str): # ... existing function code ... # END: test_advanced_scrape
-
In
/home/mike/repos/pipulate/Notebooks/Client_Work/faquilizer2/workflow_test.ipynb
(which is a.py
file):# %% # START: test_advanced_scrape_call # Cell 5 (Optional Test): Advanced Scrape # This cell tests the new browser-based scraper on the FIRST URL in your list. # It will generate a 'browser_automation/looking_at/' directory with rich artifacts. await secretsauce.test_advanced_scrape(job) # END: test_advanced_scrape_call
Step 2: Generate the Patch File
Replace the contents of patch.py
with the following. This patch will update all necessary files to implement the new folder structure and correctly handle the headless
parameter.
# patch.py
patches = [
{
"file": "/home/mike/repos/pipulate/tools/scraper_tools.py",
"block_name": "selenium_automation",
"new_code": r"""
@auto_tool
async def selenium_automation(params: dict) -> dict:
\"\"\"
Performs an advanced browser automation scrape of a single URL.
This tool gives AI "eyes" by launching a browser to capture a rich
set of artifacts from a webpage, including the DOM, source code, headers,
and an optional screenshot. It uses a clean, temporary browser profile for
each run to ensure a consistent state.
Args:
params: A dictionary containing:
- url (str): The URL to scrape.
- domain (str): The domain of the URL, used as the root folder.
- url_path_slug (str): The URL-encoded path, used as the sub-folder.
- take_screenshot (bool): Whether to capture a screenshot of the page.
- headless (bool): Whether to run the browser in headless mode. Defaults to True.
Returns:
A dictionary containing the results of the operation, including paths
to all captured artifacts.
\"\"\"
url = params.get("url")
domain = params.get("domain")
url_path_slug = params.get("url_path_slug")
take_screenshot = params.get("take_screenshot", False)
headless = params.get("headless", True)
if not all([url, domain, url_path_slug is not None]):
return {"success": False, "error": "URL, domain, and url_path_slug parameters are required."}
driver = None
artifacts = {}
try:
# --- 1. Set up output directory using new structure ---
output_dir = Path("browser_automation/looking_at/") / domain / url_path_slug
output_dir.mkdir(parents=True, exist_ok=True)
logger.info(f"💾 Saving artifacts to: {output_dir}")
# --- 2. Configure Selenium WebDriver ---
chrome_options = Options()
if headless:
chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--start-maximized") # Better for non-headless
chrome_options.add_argument("--window-size=1920,1080")
effective_os = os.environ.get('EFFECTIVE_OS', sys.platform)
if effective_os == 'darwin':
service = Service(ChromeDriverManager().install())
else:
service = Service()
logger.info(f"🚀 Initializing Chrome driver (Headless: {headless})...")
driver = wire_webdriver.Chrome(service=service, options=chrome_options)
# --- 3. Scrape the Page ---
logger.info(f" navigating to: {url}")
driver.get(url)
await asyncio.sleep(3)
# --- 4. Capture Artifacts ---
dom_path = output_dir / "dom.html"
dom_path.write_text(driver.execute_script("return document.documentElement.outerHTML;"), encoding='utf-8')
artifacts['dom'] = str(dom_path)
source_path = output_dir / "source.html"
source_path.write_text(driver.page_source, encoding='utf-8')
artifacts['source'] = str(source_path)
main_request = next((r for r in driver.requests if r.response and r.url == url), driver.last_request)
if main_request and main_request.response:
headers_path = output_dir / "headers.json"
headers_path.write_text(json.dumps(dict(main_request.response.headers), indent=2))
artifacts['headers'] = str(headers_path)
if take_screenshot:
screenshot_path = output_dir / "screenshot.png"
driver.save_screenshot(str(screenshot_path))
artifacts['screenshot'] = str(screenshot_path)
logger.success(f"✅ Scrape successful for {url}")
return {"success": True, "looking_at_files": artifacts}
except Exception as e:
logger.error(f"❌ Scrape failed for {url}: {e}")
return {"success": False, "error": str(e), "looking_at_files": artifacts}
finally:
if driver:
driver.quit()
logger.info("Browser closed.")
"""
},
{
"file": "/home/mike/repos/pipulate/pipulate/core.py",
"block_name": "scrape_method",
"new_code": r"""
async def scrape(self, url: str, take_screenshot: bool = False, mode: str = 'selenium', headless: bool = True, **kwargs):
\"\"\"
Gives AI "eyes" by performing browser automation or HTTP requests to scrape a URL.
This method is the primary entrypoint for scraping and supports multiple modes.
The default mode is 'selenium' which uses a full browser.
Args:
url (str): The URL to scrape.
take_screenshot (bool): Whether to capture a screenshot (selenium mode only). Defaults to False.
mode (str): The scraping mode to use ('selenium', 'requests', etc.). Defaults to 'selenium'.
headless (bool): Whether to run the browser in headless mode (selenium mode only). Defaults to True.
**kwargs: Additional parameters to pass to the underlying automation tool.
Returns:
dict: The result from the scraper tool, including paths to captured artifacts.
\"\"\"
from tools.scraper_tools import selenium_automation
from urllib.parse import urlparse, quote
from datetime import datetime
logger.info(f"👁️🗨️ Initiating scrape for: {url} (Mode: {mode}, Headless: {headless})")
# --- New Directory Logic ---
parsed_url = urlparse(url)
domain = parsed_url.netloc
path = parsed_url.path or '/'
# Use quote with an empty safe string to encode everything, including slashes
url_path_slug = quote(path, safe='')
params = {
"url": url,
"domain": domain,
"url_path_slug": url_path_slug,
"take_screenshot": take_screenshot,
"headless": headless,
**kwargs # Pass through any other params
}
if mode == 'selenium':
try:
result = await selenium_automation(params)
return result
except Exception as e:
logger.error(f"❌ Advanced scrape failed for {url}: {e}")
return {"success": False, "error": str(e)}
else:
logger.warning(f"Scrape mode '{mode}' is not yet implemented.")
return {"success": False, "error": f"Mode '{mode}' not implemented."}
"""
},
{
"file": "/home/mike/repos/pipulate/Notebooks/Client_Work/faquilizer2/secretsauce.py",
"block_name": "test_advanced_scrape",
"new_code": r"""
async def test_advanced_scrape(job: str, headless: bool = False):
\"\"\"
NEW (Optional Test): Scrapes the FIRST URL from the list using the advanced
pip.scrape() browser automation to capture a full set of artifacts.
\"\"\"
print("\n--- 🧪 Starting Advanced Scrape Test Flight ---")
urls_to_process = pip.get(job, URL_LIST_STEP, [])
if not urls_to_process:
print(" -> No URLs found to test. Skipping.")
return
url_to_test = urls_to_process[0]
print(f" -> Target: {url_to_test}")
print(f" -> Headless Mode: {headless}")
# This is the call to the powerful, Selenium-based scraper
# exposed through the pipulate library, now with headless toggle.
result = await pip.scrape(url=url_to_test, take_screenshot=True, headless=headless)
if result.get('success'):
print(f" -> ✅ Success! Advanced scrape complete.")
files_created = result.get('looking_at_files', {})
print(" -> Artifacts captured in 'browser_automation/looking_at/':")
for key, path in files_created.items():
if path:
print(f" - {key}: {path}")
else:
print(f" -> ❌ Failed: {result.get('error')}")
print("--- 🧪 Test Flight Complete ---\n")
"""
},
{
"file": "/home/mike/repos/pipulate/Notebooks/Client_Work/faquilizer2/workflow_test.ipynb",
"block_name": "test_advanced_scrape_call",
"new_code": r"""
# Cell 5 (Optional Test): Advanced Scrape
# This cell tests the new browser-based scraper on the FIRST URL in your list.
# It will generate a 'browser_automation/looking_at/' directory with rich artifacts.
# Set headless=False to see the browser window pop up.
await secretsauce.test_advanced_scrape(job, headless=False)
"""
}
]
Step 3: Apply the Patch
Now, run the ai_edit.py
script from your repository root. It will apply all the changes across the necessary files in one deterministic operation.
.venv/bin/python ai_edit.py patch.py
After this, when you run your workflow_test.ipynb
, it should correctly call the scrape function with headless=False
, and you will see the browser window appear. The output files will be saved to the clean, new directory structure (.../looking_at/mikelev.in/%2F/
).
Me:
[mike@nixos:~/repos/pipulate]$ python ai_edit.py patch.py
/home/mike/repos/pipulate/ai_edit.py:27: SyntaxWarning: invalid escape sequence '\_'
* **Patch File Format:** Each dictionary must have "file", "block\_name", and "new\_code" keys. Use triple-quoted f-strings for the `new_code` value for maximum readability and flexibility.
Applying patches from: patch.py
------------------------------
Processing file: /home/mike/repos/pipulate/tools/scraper_tools.py
❌ AST validation FAILED for block 'selenium_automation'. The proposed change would break the file.
Error near line 38: \"\"\"
^
Reason: unexpected character after line continuation character
❌ An error occurred while replacing block 'selenium_automation': unexpected character after line continuation character (<unknown>, line 38)
Process aborted for /home/mike/repos/pipulate/tools/scraper_tools.py due to a critical error: unexpected character after line continuation character (<unknown>, line 38)
No changes have been written to this file.
Processing file: /home/mike/repos/pipulate/pipulate/core.py
❌ AST validation FAILED for block 'scrape_method'. The proposed change would break the file.
Error near line 1878: \"\"\"
^
Reason: unexpected character after line continuation character
❌ An error occurred while replacing block 'scrape_method': unexpected character after line continuation character (<unknown>, line 1878)
Process aborted for /home/mike/repos/pipulate/pipulate/core.py due to a critical error: unexpected character after line continuation character (<unknown>, line 1878)
No changes have been written to this file.
Processing file: /home/mike/repos/pipulate/Notebooks/Client_Work/faquilizer2/secretsauce.py
❌ AST validation FAILED for block 'test_advanced_scrape'. The proposed change would break the file.
Error near line 208: \"\"\"
^
Reason: unexpected character after line continuation character
❌ An error occurred while replacing block 'test_advanced_scrape': unexpected character after line continuation character (<unknown>, line 208)
Process aborted for /home/mike/repos/pipulate/Notebooks/Client_Work/faquilizer2/secretsauce.py due to a critical error: unexpected character after line continuation character (<unknown>, line 208)
No changes have been written to this file.
Processing file: /home/mike/repos/pipulate/Notebooks/Client_Work/faquilizer2/workflow_test.ipynb
⚠️ Could not find sentinels for block 'test_advanced_scrape_call' in workflow_test.ipynb. Skipping.
🤷 No changes were made to /home/mike/repos/pipulate/Notebooks/Client_Work/faquilizer2/workflow_test.ipynb.
------------------------------
Refactoring process complete. Please review the changes with 'git diff'.
[mike@nixos:~/repos/pipulate]$
Try again with the stuff that failed on AST. And don’t do the edits on the
Notebook. The original file is .ipynb
and this won’t work, so I’ll hand-edit
that 1-line change. New patch please and thank you!
Gemini: You’ve found the bug! My apologies. The ai_edit.py
script’s AST validation is doing its job perfectly and caught a subtle syntax error I introduced.
The error unexpected character after line continuation character
happened because I wrapped the new code blocks in raw f-strings (r"""..."""
). This caused the backslashes in the inner docstrings (\"\"\"
) to be misinterpreted by the parser.
Let’s fix this by using regular f-strings and removing the patch for the notebook file, as you requested.
Corrected Patch File
Please replace the entire contents of your patch.py
file with this corrected version. It removes the raw string formatting and omits the change for the .ipynb
file.
# patch.py
patches = [
{
"file": "/home/mike/repos/pipulate/tools/scraper_tools.py",
"block_name": "selenium_automation",
"new_code": """
@auto_tool
async def selenium_automation(params: dict) -> dict:
\"\"\"
Performs an advanced browser automation scrape of a single URL.
This tool gives AI "eyes" by launching a browser to capture a rich
set of artifacts from a webpage, including the DOM, source code, headers,
and an optional screenshot. It uses a clean, temporary browser profile for
each run to ensure a consistent state.
Args:
params: A dictionary containing:
- url (str): The URL to scrape.
- domain (str): The domain of the URL, used as the root folder.
- url_path_slug (str): The URL-encoded path, used as the sub-folder.
- take_screenshot (bool): Whether to capture a screenshot of the page.
- headless (bool): Whether to run the browser in headless mode. Defaults to True.
Returns:
A dictionary containing the results of the operation, including paths
to all captured artifacts.
\"\"\"
url = params.get("url")
domain = params.get("domain")
url_path_slug = params.get("url_path_slug")
take_screenshot = params.get("take_screenshot", False)
headless = params.get("headless", True)
if not all([url, domain, url_path_slug is not None]):
return {"success": False, "error": "URL, domain, and url_path_slug parameters are required."}
driver = None
artifacts = {}
try:
# --- 1. Set up output directory using new structure ---
output_dir = Path("browser_automation/looking_at/") / domain / url_path_slug
output_dir.mkdir(parents=True, exist_ok=True)
logger.info(f"💾 Saving artifacts to: {output_dir}")
# --- 2. Configure Selenium WebDriver ---
chrome_options = Options()
if headless:
chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--start-maximized") # Better for non-headless
chrome_options.add_argument("--window-size=1920,1080")
effective_os = os.environ.get('EFFECTIVE_OS', sys.platform)
if effective_os == 'darwin':
service = Service(ChromeDriverManager().install())
else:
service = Service()
logger.info(f"🚀 Initializing Chrome driver (Headless: {headless})...")
driver = wire_webdriver.Chrome(service=service, options=chrome_options)
# --- 3. Scrape the Page ---
logger.info(f" navigating to: {url}")
driver.get(url)
await asyncio.sleep(3)
# --- 4. Capture Artifacts ---
dom_path = output_dir / "dom.html"
dom_path.write_text(driver.execute_script("return document.documentElement.outerHTML;"), encoding='utf-8')
artifacts['dom'] = str(dom_path)
source_path = output_dir / "source.html"
source_path.write_text(driver.page_source, encoding='utf-8')
artifacts['source'] = str(source_path)
main_request = next((r for r in driver.requests if r.response and r.url == url), driver.last_request)
if main_request and main_request.response:
headers_path = output_dir / "headers.json"
headers_path.write_text(json.dumps(dict(main_request.response.headers), indent=2))
artifacts['headers'] = str(headers_path)
if take_screenshot:
screenshot_path = output_dir / "screenshot.png"
driver.save_screenshot(str(screenshot_path))
artifacts['screenshot'] = str(screenshot_path)
logger.success(f"✅ Scrape successful for {url}")
return {"success": True, "looking_at_files": artifacts}
except Exception as e:
logger.error(f"❌ Scrape failed for {url}: {e}")
return {"success": False, "error": str(e), "looking_at_files": artifacts}
finally:
if driver:
driver.quit()
logger.info("Browser closed.")
"""
},
{
"file": "/home/mike/repos/pipulate/pipulate/core.py",
"block_name": "scrape_method",
"new_code": """
async def scrape(self, url: str, take_screenshot: bool = False, mode: str = 'selenium', headless: bool = True, **kwargs):
\"\"\"
Gives AI "eyes" by performing browser automation or HTTP requests to scrape a URL.
This method is the primary entrypoint for scraping and supports multiple modes.
The default mode is 'selenium' which uses a full browser.
Args:
url (str): The URL to scrape.
take_screenshot (bool): Whether to capture a screenshot (selenium mode only). Defaults to False.
mode (str): The scraping mode to use ('selenium', 'requests', etc.). Defaults to 'selenium'.
headless (bool): Whether to run the browser in headless mode (selenium mode only). Defaults to True.
**kwargs: Additional parameters to pass to the underlying automation tool.
Returns:
dict: The result from the scraper tool, including paths to captured artifacts.
\"\"\"
from tools.scraper_tools import selenium_automation
from urllib.parse import urlparse, quote
from datetime import datetime
logger.info(f"👁️🗨️ Initiating scrape for: {url} (Mode: {mode}, Headless: {headless})")
# --- New Directory Logic ---
parsed_url = urlparse(url)
domain = parsed_url.netloc
path = parsed_url.path or '/'
# Use quote with an empty safe string to encode everything, including slashes
url_path_slug = quote(path, safe='')
params = {
"url": url,
"domain": domain,
"url_path_slug": url_path_slug,
"take_screenshot": take_screenshot,
"headless": headless,
**kwargs # Pass through any other params
}
if mode == 'selenium':
try:
result = await selenium_automation(params)
return result
except Exception as e:
logger.error(f"❌ Advanced scrape failed for {url}: {e}")
return {"success": False, "error": str(e)}
else:
logger.warning(f"Scrape mode '{mode}' is not yet implemented.")
return {"success": False, "error": f"Mode '{mode}' not implemented."}
"""
},
{
"file": "/home/mike/repos/pipulate/Notebooks/Client_Work/faquilizer2/secretsauce.py",
"block_name": "test_advanced_scrape",
"new_code": """
async def test_advanced_scrape(job: str, headless: bool = False):
\"\"\"
NEW (Optional Test): Scrapes the FIRST URL from the list using the advanced
pip.scrape() browser automation to capture a full set of artifacts.
\"\"\"
print("\\n--- 🧪 Starting Advanced Scrape Test Flight ---")
urls_to_process = pip.get(job, URL_LIST_STEP, [])
if not urls_to_process:
print(" -> No URLs found to test. Skipping.")
return
url_to_test = urls_to_process[0]
print(f" -> Target: {url_to_test}")
print(f" -> Headless Mode: {headless}")
# This is the call to the powerful, Selenium-based scraper
# exposed through the pipulate library, now with headless toggle.
result = await pip.scrape(url=url_to_test, take_screenshot=True, headless=headless)
if result.get('success'):
print(f" -> ✅ Success! Advanced scrape complete.")
files_created = result.get('looking_at_files', {})
print(" -> Artifacts captured in 'browser_automation/looking_at/':")
for key, path in files_created.items():
if path:
print(f" - {key}: {path}")
else:
print(f" -> ❌ Failed: {result.get('error')}")
print("--- 🧪 Test Flight Complete ---\\n")
"""
}
]
Now, when you run python ai_edit.py patch.py
, it should pass the AST validation and apply the changes successfully. Please and thank you for catching that!
Me: Much better!
[mike@nixos:~/repos/pipulate]$ python ai_edit.py patch.py
/home/mike/repos/pipulate/ai_edit.py:27: SyntaxWarning: invalid escape sequence '\_'
* **Patch File Format:** Each dictionary must have "file", "block\_name", and "new\_code" keys. Use triple-quoted f-strings for the `new_code` value for maximum readability and flexibility.
Applying patches from: patch.py
------------------------------
Processing file: /home/mike/repos/pipulate/tools/scraper_tools.py
✅ AST validation successful for block 'selenium_automation'.
✅ Block 'selenium_automation' in scraper_tools.py is ready to be replaced.
Writing changes to /home/mike/repos/pipulate/tools/scraper_tools.py...
💾 File saved successfully.
Processing file: /home/mike/repos/pipulate/pipulate/core.py
✅ AST validation successful for block 'scrape_method'.
✅ Block 'scrape_method' in core.py is ready to be replaced.
Writing changes to /home/mike/repos/pipulate/pipulate/core.py...
💾 File saved successfully.
Processing file: /home/mike/repos/pipulate/Notebooks/Client_Work/faquilizer2/secretsauce.py
✅ AST validation successful for block 'test_advanced_scrape'.
✅ Block 'test_advanced_scrape' in secretsauce.py is ready to be replaced.
Writing changes to /home/mike/repos/pipulate/Notebooks/Client_Work/faquilizer2/secretsauce.py...
💾 File saved successfully.
------------------------------
Refactoring process complete. Please review the changes with 'git diff'.
[mike@nixos:~/repos/pipulate]$
And the git diff:
[mike@nixos:~/repos/pipulate]$ git --no-pager diff
diff --git a/patch.py b/patch.py
index 38df5817..18e4f42b 100644
--- a/patch.py
+++ b/patch.py
@@ -17,7 +17,8 @@ patches = [
Args:
params: A dictionary containing:
- url (str): The URL to scrape.
- - pipeline_id (str): A unique ID for this job, used for the output folder name.
+ - domain (str): The domain of the URL, used as the root folder.
+ - url_path_slug (str): The URL-encoded path, used as the sub-folder.
- take_screenshot (bool): Whether to capture a screenshot of the page.
- headless (bool): Whether to run the browser in headless mode. Defaults to True.
@@ -26,20 +27,20 @@ patches = [
to all captured artifacts.
\"\"\"
url = params.get("url")
- pipeline_id = params.get("pipeline_id", f"scrape-{datetime.now().isoformat()}")
+ domain = params.get("domain")
+ url_path_slug = params.get("url_path_slug")
take_screenshot = params.get("take_screenshot", False)
- headless = params.get("headless", True) # Default to headless mode
+ headless = params.get("headless", True)
- if not url:
- return {"success": False, "error": "URL parameter is required."}
+ if not all([url, domain, url_path_slug is not None]):
+ return {"success": False, "error": "URL, domain, and url_path_slug parameters are required."}
driver = None
artifacts = {}
try:
- # --- 1. Set up output directory ---
- domain, path_slug = get_safe_path_component(url)
- output_dir = Path("browser_automation/looking_at/") / pipeline_id
+ # --- 1. Set up output directory using new structure ---
+ output_dir = Path("browser_automation/looking_at/") / domain / url_path_slug
output_dir.mkdir(parents=True, exist_ok=True)
logger.info(f"💾 Saving artifacts to: {output_dir}")
@@ -49,14 +50,14 @@ patches = [
chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
+ chrome_options.add_argument("--start-maximized") # Better for non-headless
chrome_options.add_argument("--window-size=1920,1080")
- # Use webdriver-manager for cross-platform compatibility
effective_os = os.environ.get('EFFECTIVE_OS', sys.platform)
if effective_os == 'darwin':
service = Service(ChromeDriverManager().install())
else:
- service = Service() # Assumes chromedriver is in PATH
+ service = Service()
logger.info(f"🚀 Initializing Chrome driver (Headless: {headless})...")
driver = wire_webdriver.Chrome(service=service, options=chrome_options)
@@ -64,12 +65,11 @@ patches = [
# --- 3. Scrape the Page ---
logger.info(f" navigating to: {url}")
driver.get(url)
- await asyncio.sleep(3) # Wait for JS to render
+ await asyncio.sleep(3)
# --- 4. Capture Artifacts ---
dom_path = output_dir / "dom.html"
- dom_content = driver.execute_script("return document.documentElement.outerHTML;")
- dom_path.write_text(dom_content, encoding='utf-8')
+ dom_path.write_text(driver.execute_script("return document.documentElement.outerHTML;"), encoding='utf-8')
artifacts['dom'] = str(dom_path)
source_path = output_dir / "source.html"
@@ -122,19 +122,22 @@ patches = [
dict: The result from the scraper tool, including paths to captured artifacts.
\"\"\"
from tools.scraper_tools import selenium_automation
- from urllib.parse import urlparse
+ from urllib.parse import urlparse, quote
from datetime import datetime
logger.info(f"👁️🗨️ Initiating scrape for: {url} (Mode: {mode}, Headless: {headless})")
- # Create a transient, descriptive pipeline_id for this one-off scrape.
- domain = urlparse(url).netloc
- timestamp = datetime.now().strftime('%H%M%S')
- scrape_pipeline_id = f"scrape-{domain.replace('.', '-')}-{timestamp}"
+ # --- New Directory Logic ---
+ parsed_url = urlparse(url)
+ domain = parsed_url.netloc
+ path = parsed_url.path or '/'
+ # Use quote with an empty safe string to encode everything, including slashes
+ url_path_slug = quote(path, safe='')
params = {
"url": url,
- "pipeline_id": scrape_pipeline_id,
+ "domain": domain,
+ "url_path_slug": url_path_slug,
"take_screenshot": take_screenshot,
"headless": headless,
**kwargs # Pass through any other params
@@ -151,5 +154,39 @@ patches = [
logger.warning(f"Scrape mode '{mode}' is not yet implemented.")
return {"success": False, "error": f"Mode '{mode}' not implemented."}
"""
+ },
+ {
+ "file": "/home/mike/repos/pipulate/Notebooks/Client_Work/faquilizer2/secretsauce.py",
+ "block_name": "test_advanced_scrape",
+ "new_code": """
+ async def test_advanced_scrape(job: str, headless: bool = False):
+ \"\"\"
+ NEW (Optional Test): Scrapes the FIRST URL from the list using the advanced
+ pip.scrape() browser automation to capture a full set of artifacts.
+ \"\"\"
+ print("\\n--- 🧪 Starting Advanced Scrape Test Flight ---")
+ urls_to_process = pip.get(job, URL_LIST_STEP, [])
+ if not urls_to_process:
+ print(" -> No URLs found to test. Skipping.")
+ return
+ url_to_test = urls_to_process[0]
+ print(f" -> Target: {url_to_test}")
+ print(f" -> Headless Mode: {headless}")
+
+ # This is the call to the powerful, Selenium-based scraper
+ # exposed through the pipulate library, now with headless toggle.
+ result = await pip.scrape(url=url_to_test, take_screenshot=True, headless=headless)
+
+ if result.get('success'):
+ print(f" -> ✅ Success! Advanced scrape complete.")
+ files_created = result.get('looking_at_files', {})
+ print(" -> Artifacts captured in 'browser_automation/looking_at/':")
+ for key, path in files_created.items():
+ if path:
+ print(f" - {key}: {path}")
+ else:
+ print(f" -> ❌ Failed: {result.get('error')}")
+ print("--- 🧪 Test Flight Complete ---\\n")
+ """
}
]
\ No newline at end of file
diff --git a/pipulate/core.py b/pipulate/core.py
index b5637c1d..95a7e72e 100644
--- a/pipulate/core.py
+++ b/pipulate/core.py
@@ -1874,7 +1874,7 @@ class Pipulate:
return state.get(step, default)
# START: scrape_method
- async def scrape(self, url: str, take_screenshot: bool = False, mode: str = 'selenium', headless: bool = False, **kwargs):
+ async def scrape(self, url: str, take_screenshot: bool = False, mode: str = 'selenium', headless: bool = True, **kwargs):
"""
Gives AI "eyes" by performing browser automation or HTTP requests to scrape a URL.
@@ -1885,26 +1885,29 @@ class Pipulate:
url (str): The URL to scrape.
take_screenshot (bool): Whether to capture a screenshot (selenium mode only). Defaults to False.
mode (str): The scraping mode to use ('selenium', 'requests', etc.). Defaults to 'selenium'.
- headless (bool): Whether to run the browser in headless mode (selenium mode only). Defaults to False.
+ headless (bool): Whether to run the browser in headless mode (selenium mode only). Defaults to True.
**kwargs: Additional parameters to pass to the underlying automation tool.
Returns:
dict: The result from the scraper tool, including paths to captured artifacts.
"""
from tools.scraper_tools import selenium_automation
- from urllib.parse import urlparse
+ from urllib.parse import urlparse, quote
from datetime import datetime
logger.info(f"👁️🗨️ Initiating scrape for: {url} (Mode: {mode}, Headless: {headless})")
- # Create a transient, descriptive pipeline_id for this one-off scrape.
- domain = urlparse(url).netloc
- timestamp = datetime.now().strftime('%H%M%S')
- scrape_pipeline_id = f"scrape-{domain.replace('.', '-')}-{timestamp}"
+ # --- New Directory Logic ---
+ parsed_url = urlparse(url)
+ domain = parsed_url.netloc
+ path = parsed_url.path or '/'
+ # Use quote with an empty safe string to encode everything, including slashes
+ url_path_slug = quote(path, safe='')
params = {
"url": url,
- "pipeline_id": scrape_pipeline_id,
+ "domain": domain,
+ "url_path_slug": url_path_slug,
"take_screenshot": take_screenshot,
"headless": headless,
**kwargs # Pass through any other params
diff --git a/tools/scraper_tools.py b/tools/scraper_tools.py
index bf41402a..8e8fe758 100644
--- a/tools/scraper_tools.py
+++ b/tools/scraper_tools.py
@@ -32,12 +32,13 @@ def get_safe_path_component(url: str) -> tuple[str, str]:
# --- The Refactored Browser Automation Tool ---
+# START: selenium_automation
@auto_tool
async def selenium_automation(params: dict) -> dict:
"""
Performs an advanced browser automation scrape of a single URL.
- This tool gives AI "eyes" by launching a headless browser to capture a rich
+ This tool gives AI "eyes" by launching a browser to capture a rich
set of artifacts from a webpage, including the DOM, source code, headers,
and an optional screenshot. It uses a clean, temporary browser profile for
each run to ensure a consistent state.
@@ -45,74 +46,71 @@ async def selenium_automation(params: dict) -> dict:
Args:
params: A dictionary containing:
- url (str): The URL to scrape.
- - pipeline_id (str): A unique ID for this job, used for the output folder name.
+ - domain (str): The domain of the URL, used as the root folder.
+ - url_path_slug (str): The URL-encoded path, used as the sub-folder.
- take_screenshot (bool): Whether to capture a screenshot of the page.
+ - headless (bool): Whether to run the browser in headless mode. Defaults to True.
Returns:
A dictionary containing the results of the operation, including paths
to all captured artifacts.
"""
url = params.get("url")
- pipeline_id = params.get("pipeline_id", f"scrape-{datetime.now().isoformat()}")
+ domain = params.get("domain")
+ url_path_slug = params.get("url_path_slug")
take_screenshot = params.get("take_screenshot", False)
+ headless = params.get("headless", True)
- if not url:
- return {"success": False, "error": "URL parameter is required."}
+ if not all([url, domain, url_path_slug is not None]):
+ return {"success": False, "error": "URL, domain, and url_path_slug parameters are required."}
driver = None
artifacts = {}
try:
- # --- 1. Set up output directory ---
- domain, path_slug = get_safe_path_component(url)
- # Consistent with secretsauce.py's expectation
- output_dir = Path("browser_automation/looking_at/") / pipeline_id
+ # --- 1. Set up output directory using new structure ---
+ output_dir = Path("browser_automation/looking_at/") / domain / url_path_slug
output_dir.mkdir(parents=True, exist_ok=True)
logger.info(f"💾 Saving artifacts to: {output_dir}")
# --- 2. Configure Selenium WebDriver ---
chrome_options = Options()
- chrome_options.add_argument("--headless") # Run in headless mode
+ if headless:
+ chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
+ chrome_options.add_argument("--start-maximized") # Better for non-headless
chrome_options.add_argument("--window-size=1920,1080")
- # Use webdriver-manager for cross-platform compatibility
effective_os = os.environ.get('EFFECTIVE_OS', sys.platform)
if effective_os == 'darwin':
service = Service(ChromeDriverManager().install())
else:
- # Assumes chromedriver is in PATH for Linux/other environments
service = Service()
- logger.info("🚀 Initializing Chrome driver with Selenium-Wire...")
+ logger.info(f"🚀 Initializing Chrome driver (Headless: {headless})...")
driver = wire_webdriver.Chrome(service=service, options=chrome_options)
# --- 3. Scrape the Page ---
- logger.info(f" navigatin to: {url}")
+ logger.info(f" navigating to: {url}")
driver.get(url)
- await asyncio.sleep(3) # Wait for JS to render
+ await asyncio.sleep(3)
# --- 4. Capture Artifacts ---
- # DOM
dom_path = output_dir / "dom.html"
- dom_content = driver.execute_script("return document.documentElement.outerHTML;")
- dom_path.write_text(dom_content, encoding='utf-8')
+ dom_path.write_text(driver.execute_script("return document.documentElement.outerHTML;"), encoding='utf-8')
artifacts['dom'] = str(dom_path)
- # Source
source_path = output_dir / "source.html"
source_path.write_text(driver.page_source, encoding='utf-8')
artifacts['source'] = str(source_path)
- # Headers
main_request = next((r for r in driver.requests if r.response and r.url == url), driver.last_request)
if main_request and main_request.response:
headers_path = output_dir / "headers.json"
headers_path.write_text(json.dumps(dict(main_request.response.headers), indent=2))
artifacts['headers'] = str(headers_path)
- # Screenshot
if take_screenshot:
screenshot_path = output_dir / "screenshot.png"
driver.save_screenshot(str(screenshot_path))
@@ -129,3 +127,4 @@ async def selenium_automation(params: dict) -> dict:
if driver:
driver.quit()
logger.info("Browser closed.")
+# END: selenium_automation
[mike@nixos:~/repos/pipulate]$
YES! The browser popped up and it worked exactly as intended:
--- 🧪 Starting Advanced Scrape Test Flight ---
-> Target: https://mikelev.in/
-> Headless Mode: False
/home/mike/repos/pipulate/.venv/lib/python3.12/site-packages/seleniumwire/thirdparty/mitmproxy/contrib/kaitaistruct/tls_client_hello.py:10: UserWarning: pkg_resources is deprecated as an API. See https://setuptools.pypa.io/en/latest/pkg_resources.html. The pkg_resources package is slated for removal as early as 2025-11-30. Refrain from using this package or pin to Setuptools<81.
from pkg_resources import parse_version
-> ✅ Success! Advanced scrape complete.
-> Artifacts captured in 'browser_automation/looking_at/':
- dom: browser_automation/looking_at/mikelev.in/%2F/dom.html
- source: browser_automation/looking_at/mikelev.in/%2F/source.html
- headers: browser_automation/looking_at/mikelev.in/%2F/headers.json
- screenshot: browser_automation/looking_at/mikelev.in/%2F/screenshot.png
--- 🧪 Test Flight Complete ---
[mike@nixos:~/repos/pipulate/Notebooks/Client_Work/faquilizer2/browser_automation/looking_at]$ ls
mikelev.in
[mike@nixos:~/repos/pipulate/Notebooks/Client_Work/faquilizer2/browser_automation/looking_at]$ cd mikelev.in/
[mike@nixos:~/repos/pipulate/Notebooks/Client_Work/faquilizer2/browser_automation/looking_at/mikelev.in]$ ls
%2F
[mike@nixos:~/repos/pipulate/Notebooks/Client_Work/faquilizer2/browser_automation/looking_at/mikelev.in]$ cd %2F/
[mike@nixos:~/repos/pipulate/Notebooks/Client_Work/faquilizer2/browser_automation/looking_at/mikelev.in/%2F]$ ls
dom.html headers.json screenshot.png source.html
[mike@nixos:~/repos/pipulate/Notebooks/Client_Work/faquilizer2/browser_automation/looking_at/mikelev.in/%2F]$ cat headers.json
{
"server": "GitHub.com",
"content-type": "text/html; charset=utf-8",
"last-modified": "Fri, 10 Oct 2025 16:59:13 GMT",
"access-control-allow-origin": "*",
"etag": "W/\"68e93b61-a6d68\"",
"expires": "Fri, 10 Oct 2025 22:39:16 GMT",
"cache-control": "max-age=600",
"content-encoding": "gzip",
"x-proxy-cache": "MISS",
"x-github-request-id": "7C42:33E725:D01F1:EDDAE:68E988BC",
"accept-ranges": "bytes",
"age": "0",
"date": "Fri, 10 Oct 2025 22:29:16 GMT",
"via": "1.1 varnish",
"x-served-by": "cache-ewr-kewr1740023-EWR",
"x-cache": "MISS",
"x-cache-hits": "0",
"x-timer": "S1760135356.306289,VS0,VE39",
"vary": "Accept-Encoding",
"x-fastly-request-id": "f0d2895baa0e6ed17199a3ecbf4f0f807d12378e",
"content-length": "166580"
}
[mike@nixos:~/repos/pipulate/Notebooks/Client_Work/faquilizer2/browser_automation/looking_at/mikelev.in/%2F]$
Uhhh… wow. Just, wow.
Validation and Confirmation of Success
Gemini: Fantastic! I’m really glad we got that working exactly as intended. Seeing the browser pop up and having the file structure turn out perfectly clean is a great result. That sets a solid, reliable foundation for all the browser automation tasks ahead.
It’s amazing what a little refactoring and sticking to a deterministic plan can do. We’ve gone from a confusing, complex implementation to something simple, predictable, and extensible in just a couple of iterations.
Since we’ve achieved the main goals for this task, is there anything else you’d like to refine or build on from here?
Refining the API: Headless Mode and Deterministic Paths
Me: Repress or address:
/home/mike/repos/pipulate/.venv/lib/python3.12/site-packages/seleniumwire/thirdparty/mitmproxy/contrib/kaitaistruct/tls_client_hello.py:10: UserWarning: pkg_resources is deprecated as an API. See https://setuptools.pypa.io/en/latest/pkg_resources.html. The pkg_resources package is slated for removal as early as 2025-11-30. Refrain from using this package or pin to Setuptools<81.
from pkg_resources import parse_version
We can just override the warn if that’s easiest.
Gemini: You should definitely repress it.
That UserWarning
is the perfect example of noise from a third-party dependency that you shouldn’t try to fix. Addressing it would mean patching code deep inside selenium-wire
’s dependencies, which would be a fragile and temporary fix.
The best practice here is to acknowledge the warning and then cleanly suppress it in your own code, so your output remains clean while you wait for the upstream packages to update.
The Plan
We can add a warning filter to the spot where the pip
object is instantiated for notebooks. This will solve the problem globally for all your notebook sessions without cluttering your workflow or tool-specific code.
The ideal place for this is at the top of /home/mike/repos/pipulate/pipulate/pipulate.py
.
The Code
Here’s the change. We’ll add two lines to import the warnings
module and set a specific filter for that pkg_resources
UserWarning
.
Please replace the contents of /home/mike/repos/pipulate/pipulate/pipulate.py
with this updated code:
# /home/mike/repos/pipulate/pipulate/pipulate.py
# START: pipulate_factory_content
import os
import sys
from pathlib import Path
from loguru import logger
import warnings # <-- ADD THIS LINE
from .core import Pipulate
# 🎯 NOTEBOOK LOGGING: Configure logger for a cleaner Jupyter experience.
logger.remove()
logger.add(sys.stderr, level="WARNING")
# Suppress the specific UserWarning from pkg_resources in selenium-wire
warnings.filterwarnings(
"ignore",
category=UserWarning,
message="pkg_resources is deprecated as an API.*"
) # <-- ADD THIS BLOCK
def _find_project_root(start_path):
"""Find the project root by looking for the flake.nix file."""
current_path = Path(start_path).resolve()
while current_path != current_path.parent:
if (current_path / 'flake.nix').exists():
return current_path
current_path = current_path.parent
return None
def _get_db_path():
"""
Determines the correct database path for standalone (notebook) usage.
- If run from within the dev repo, it uses the local `data/` directory.
- If run as an installed package, it uses a stable, hidden `~/.pipulate/` directory.
"""
project_root = _find_project_root(os.getcwd())
if project_root:
# We are in the dev environment. Use the existing logic.
app_name_file = project_root / 'whitelabel.txt'
if app_name_file.exists():
app_name = app_name_file.read_text().strip().lower()
else:
# Fallback to directory name if whitelabel.txt doesn't exist
app_name = project_root.name.lower()
# Always use the dev database for notebook work inside the repo
return project_root / f'data/{app_name}_dev.db'
else:
# We are likely in an installed package environment (e.g., via pip).
# Create a stable, user-specific location for the database.
home_dir = Path.home()
pipulate_dir = home_dir / '.pipulate'
pipulate_dir.mkdir(exist_ok=True)
return pipulate_dir / 'pipulate.db'
# The "factory" instantiation. This code runs when this module is imported.
# It creates a single, pre-configured instance of the Pipulate class.
db_path = _get_db_path()
# This is the magic `pip` object for notebooks.
pip = Pipulate(db_path=str(db_path))
# END: pipulate_factory_content
After making this change, restart your Jupyter kernel. The warning should now be gone, leaving your test flight output perfectly clean. This is the most idiomatic and maintainable way to handle this.
Suppressing pkg_resources
UserWarning
Me: Perfect, done and tested.
--- 🧪 Starting Advanced Scrape Test Flight ---
-> Target: https://mikelev.in/
-> Headless Mode: False
-> ✅ Success! Advanced scrape complete.
-> Artifacts captured in 'browser_automation/looking_at/':
- dom: browser_automation/looking_at/mikelev.in/%2F/dom.html
- source: browser_automation/looking_at/mikelev.in/%2F/source.html
- headers: browser_automation/looking_at/mikelev.in/%2F/headers.json
- screenshot: browser_automation/looking_at/mikelev.in/%2F/screenshot.png
--- 🧪 Test Flight Complete ---
This is what success looks like:
[mike@nixos:~/repos/pipulate]$ git commit -am "Selenium automation working from Notebook with headless False and encoded paths used for scrape folder names"
[main 2b2344f0] Selenium automation working from Notebook with headless False and encoded paths used for scrape folder names
4 files changed, 95 insertions(+), 48 deletions(-)
[mike@nixos:~/repos/pipulate]$ git push
Enumerating objects: 15, done.
Counting objects: 100% (15/15), done.
Delta compression using up to 48 threads
Compressing objects: 100% (8/8), done.
Writing objects: 100% (8/8), 2.71 KiB | 1.36 MiB/s, done.
Total 8 (delta 6), reused 0 (delta 0), pack-reused 0 (from 0)
remote: Resolving deltas: 100% (6/6), completed with 6 local objects.
To github.com:miklevin/pipulate
6a26e626..2b2344f0 main -> main
[mike@nixos:~/repos/pipulate]$
But I really should use that whole feat
convention thing when naming my git
messages. Still working on that. Anyhoo we can start the refining!
Book Analysis
Ai Editorial Take
This entry represents a compelling narrative of practical AI-assisted code improvement. It effectively frames a common developer challenge (technical debt from initial over-engineering) and provides a clear, documented path to resolution using a structured, deterministic approach. The back-and-forth interaction with the human author showcases the AI’s ability to not only generate solutions but also debug its own output and adapt to nuanced requirements, like precise file paths and non-headless browser modes. This article strongly emphasizes the value of precise tool usage and iterative refinement, making it an excellent case study for AI-driven software development.
Title Brainstorm
- Title Option: Surgical Refactoring: From Selenium Sprawl to Precision Automation
- Filename:
selenium-refactoring-precision-automation.md
- Rationale: Captures the ‘surgical’ nature of the AI-guided refactoring, the problem (Selenium sprawl/complexity), and the desired outcome (precision automation).
- Filename:
- Title Option: Streamlining Selenium: A Deterministic Approach to Browser Automation
- Filename:
streamlining-selenium-deterministic-automation.md
- Rationale: Focuses on the ‘streamlining’ and ‘deterministic’ aspects, key themes in the author’s philosophy.
- Filename:
- Title Option: Beyond the ‘Bull in a China Shop’: AI-Driven Selenium Clean-Up
- Filename:
ai-driven-selenium-cleanup.md
- Rationale: Uses the author’s vivid metaphor and highlights the AI’s role in the clean-up.
- Filename:
- Title Option: Pipulate’s Browser Automation Renaissance: A Refactoring Journey
- Filename:
pipulates-browser-automation-renaissance.md
- Rationale: Positions the work as a significant improvement and a journey, suitable for a journal entry.
- Filename:
- Title Option: Taming Selenium: Cleaner Browser Automation with
ai_edit.py
- Filename:
taming-selenium-ai-edit-automation.md
- Rationale: Highlights the tool (
ai_edit.py
) and the challenge (Taming Selenium
).
- Filename:
Content Potential And Polish
- Core Strengths:
- Demonstrates effective AI-human collaboration in code refactoring.
- Highlights a practical approach to resolving technical debt.
- Showcases deterministic, AST-validated patching for code modifications.
- Provides concrete examples of improved Selenium browser automation implementation.
- Illustrates clear problem identification and iterative solution development.
- Emphasizes clean architecture, modularity, and testability.
- Documents the journey from initial confusion to a clean, working solution.
- Suggestions For Polish:
- Include a high-level architectural diagram (ASCII or simple) showing
pip.scrape()
->scraper_tools.py
(new) -> Selenium. - Briefly explain why the old
scraper_tools.py
was ‘a complete session hijacker’ and why that was problematic, to fully justify the ‘bull in a china shop’ approach. - Add a small section on how
auto_tool
works and why it’s beneficial for the ‘tool-calling architecture’. - Discuss the benefits of the new directory structure (
domain/%2Fpath
) in more detail for context-setting. - Add a quick note on
selenium-wire
vs. regularselenium
and its specific benefits for scraping (e.g., access to requests/headers).
- Include a high-level architectural diagram (ASCII or simple) showing
Next Step Prompts
- Now that the Selenium mode is robust, implement the
mode='requests'
functionality withinpipulate/core.py
’sscrape
method. Design this usinghttpx
for efficiency and include basic header and content capture, returning a similar artifact structure. - Create a new tool,
screenshot_diff_tool.py
, that takes two screenshot paths generated bypip.scrape()
and visually compares them, returning a heatmap of differences and a percentage change for visual regression testing.