Real-Time Development of Pipulate for SEO
This article documents the real-time development process of a feature within a software project called “Pipulate,” designed for Search Engine Optimization (SEO) tasks.
Integration with Botify
The specific goal is to integrate with “Botify,” a well-known enterprise SEO analytics platform, to enable users to export large datasets (specifically CSV files) using Botify’s Application Programming Interface (API).
Technical Challenges and Design Decisions
The author details the technical challenges, design decisions, and implementation steps involved, including parsing specific web addresses (URLs), handling data limits, interacting with different modes of the Botify API (known as BQL), and managing the asynchronous nature of large data export jobs.
Modern Software Development Practices
The text also provides a fascinating look into modern software development practices, heavily featuring the use of Artificial Intelligence (AI) coding assistants (like ChatGPT, Gemini, Grok, and Claude, often within an editor called Cursor) to generate code, provide implementation plans, and troubleshoot issues.
Iterative Problem Solving and Dependency Management
It highlights the iterative process of breaking down complex problems, managing dependencies, choosing development environments (Python scripts vs. Jupyter Notebooks), and the practical considerations of building a tool intended for real-world use, balancing immediate needs with long-term goals for the software.
Top Priority: Downloading CSVs From Botify
Okay, this is the weekend that the coworker-facing version of Pipulate is born. That means it will have a Botify emphasis coming out of the starting gate, because that’s my day-job and that’s what pays the bills. A free and open source SEO product for the general public will be close on its heals. But I have to prioritize the bill-paying features first.
Managing Implementation Speed and Expectations
Speed of implementation this weekend is an issue. How many projects of the sort that truly have an accelerating effect, good choices per the 80/20-rule, can I really plow through this weekend so that come Monday, my world is changed forever? The reality is always many fewer than the vision in my head. Be conservative with what I can get done, but push hard.
The 1, 2, 3… 1? Methodology
We identify the first project in the 1, 2, 3... 1?
-step methodology in which
we are always really on step-1 because time is linear that way, and the
read/write-head of the Turing machine of life is always on the now moment.
That’s what we control, and the strongest move forward is in the effective use
of the now-moment. That step-1 is the CSV files downloads from Botify.
Handling Nested Dependencies
Okay, one of the interesting things about the now moments is when you set to do the thing immediately before you, nested dependencies shake out. And these nested dependencies might be deep rabbit holes undermining your overall effectiveness, sending you on wild goose chases and chasing red herrings. The idea is to not dive deep into nested dependencies. Dispense with them quickly.
Development Environment Strategy
Doing a CSV export from Botify has a lot of steps to it, and these are those
dependencies. I will blast through them here, breaking out the tiny sub-projects
into .py
-files, which I find I get the best AI-help with if it’s not actually
a Juptyer Notebook in Cursor. While you can use Notebooks in Cursor and VSCode,
I find that it confuses the AI assistants that have enough to keep track of.
This will probably change over time, but that’s the state of affairs right now.
So for good, solid AI-assist, it’s .py
-files in Cursor.
Balancing Jupyter Notebooks and AI Assistance
However, I cannot break my Juptyer Notebooks addiction, and when doing things
that are very much Pandas data pipeline-esque, I must have a Notebook where I’m
building things up cell-by-cell. Those who have been bitten by the Notebook bug
know what I’m talking about; data scientists and financial people. And the
Notebook-world and VSCode/AI-world — despite JupyterAI and despite the
VSCode-forks technically supporting it — don’t coexist well. And so, I have an
AI-less instance of JuptyerLabs (the one provided by Pipulate) running
side-by-side with Cursor. I mock up code in Cursor with AI assistance in a
.py
-file, and copy-paste it to the appropriate cell in a separate
.ipynb
-file in Jupyter.
Organizing Development Files
As far as the file locations where I do this, a sort of scratchpad, I now have
the pipulate/precursors/
location, which is Jupyter Notebook .ipynb
or
executable script .py
-files on their way to becoming Pipulate Workflows or
CRUD apps. This is a location that can get a bit messy, and I’ll have to clean
it up and bring order to it over time, as useful utilities will exist in there
for awhile until they find better homes.
Project Acceleration and Context Management
Project acceleration starts here. Actually, it started yesterday with allowing articles like this to be included in super-prompts that cherry-pick parts of the overarching repo to put in context, plus an article like this, plus a manifest of included files and their token sizes near the beginning so the LLM knows what its dealing with, plus a pre-prompt and a post-prompt to sandwich the considerable chunk of information they’re dealing with. One prompt at the beginning sets the stage and the prompt at the end reminds the LLM what its doing because they tend to forget after parsing through a huge chunk of tokens with all kinds of embedded stories distracts them from their original purpose.
Breaking Down the Implementation
That was yesterday’s work. That’s done, and it’s time to put it to excellent use today. The nested dependencies are numerous. Instead of the giant context foo trick, I think I’ll start with tiny chisel-strike projects. Each one is interesting in its own right. It could probably justify a number of smaller articles, each one zooming in on the micro-challenge but I’m not taking the time to organize it like that. This is like hurdles over obstacles in a race. Go!
Parsing Botify Project URLs
The first step in the Pipulate Workflow, or cell in the Jupyter Notebook, will be to take a Botify “project” URL of almost any variety and strip it down to the basics, because none of it is needed except for the organization (org) and project “slugs” that can be extracted out of it. So we start with something like:
https://app.botify.com/example-org/example.com/crawl?analysisSlug=20250413&comparisonAnalysisSlug=20250330
…in which example-org
is the organization slug and example.com
is the
project slug. In Python f-string terms, the pattern we are re-constructing given
a URL of this type is:
f”https://app.botify.com/{org}/{project}/”
Understanding Pipulate Workflow Data Structure
Okay, so the way data ends up in a Pipulate workflow record in the JSON data blob field is:
{"created": "2025-04-19T10:51:15.038580", "updated": "2025-04-19T10:51:33.325942", "step_01": {"name": "Mike"}, "step_02": {"greeting": "Hello Mike"}, "finalize": {"finalized": true}}
The structure is that of a nested dictionary. While step_01
only has {"name":
"Mike"}
, there’s no reason it couldn’t have:
{"url": "https://app.botify.com/example-org/example.com/", "org": "example-org", "project": "example.com"}
Understanding Step Data Storage in Pipulate
So, a single step, aka card, in a Pipulate pipeline can have multiple key/value pairs that it stores, but only one of them indicates that the step is “done”. In this way, even a single step can have multiple progressive sub-steps, or even if it’s a 1-shot step, it can record multiple datapoints. Now it might seem that storing the canonicalized project URL plus the org and project broken out might seem redundant, the benefit is simply not having to repeat the extraction or concatenation code elsewhere when they need to be used. They can be grabbed directly out of their respective fields.
Simplifying the Development Process
This step is really me getting over a hump. I have been remiss in doing this
step because of general confusion regarding execution environments, mixed
concerns and the like. So what I’m doing is distilling it down to its pure
essence, Python-wise and putting it in a file named precursors/temp.py
whose
mere name is a clue to the AI assistants involved that they also need not
concern themselves with all that outer context stuff. It’s a temporary workspace
and scratchpad where we take as input:
https://app.botify.com/example-org/example.com/crawl?analysisSlug=20250413&comparisonAnalysisSlug=20250330
…and output:
url: https://app.botify.com/example-org/example.com/ org: example-org project: example.com
…and you need not even concern yourself with that Pipulate pipeline JSON data structure or anything. We will tackle that next, and this is just to give you context, and to do this first bit of Python code in complete isolation.
Go.
Canonicalizing Botify Project URLs
If you keep your choice on your favorite modle, your Cursor AI “slow mode” may be very slow, indeed. But if you switch it to “default” then it will plow right through it as if you’re not into your zillionth completion for the month.
Default Cursor Model: I understand you want to create a simple Python function that takes a Botify URL and extracts the organization and project slugs, along with the canonicalized base URL. Let me help you create this in precursors/temp.py
.
First, I’ll create the file with a function that handles this URL parsing:
# pipulate/precursors/temp.py
from urllib.parse import urlparse
def parse_botify_url(url: str) -> dict:
"""
Parse a Botify URL to extract organization and project slugs.
Args:
url (str): A Botify URL, e.g. https://app.botify.com/example-org/example.com/crawl?analysisSlug=20250413
Returns:
dict: Dictionary containing:
- url: Canonicalized base URL
- org: Organization slug
- project: Project slug
"""
parsed = urlparse(url)
path_parts = parsed.path.strip('/').split('/')
if len(path_parts) < 2:
raise ValueError("Invalid Botify URL format")
org = path_parts[0]
project = path_parts[1]
# Construct canonicalized base URL
base_url = f"https://{parsed.netloc}/{org}/{project}/"
return {
"url": base_url,
"org": org,
"project": project
}
# Example usage
if __name__ == "__main__":
test_url = "https://app.botify.com/example-org/example.com/crawl?analysisSlug=20250413&comparisonAnalysisSlug=20250330"
result = parse_botify_url(test_url)
print(f"url: {result['url']}")
print(f"org: {result['org']}")
print(f"project: {result['project']}")
I’ve created a simple but robust function that:
- Takes a Botify URL as input
- Uses Python’s built-in
urllib.parse
to properly parse the URL - Extracts the organization and project slugs from the path
- Constructs the canonicalized base URL
- Returns a dictionary with all three pieces of information
The function includes:
- Type hints for better code clarity
- Docstring documentation
- Input validation
- Example usage that can be run directly
You can test this by running the file directly. It will output:
url: https://app.botify.com/example-org/example.com/
org: example-org
project: example.com
This is a clean, focused implementation that handles just the URL parsing task. It’s ready to be integrated into your larger workflow system, but for now it’s isolated in precursors/temp.py
as requested.
Would you like me to add any additional features or make any modifications to this implementation?
The Big Frontier AI Models Showing Me What They’ve Got
Anyone who has been following along (nobody, I suspect) knows that I have a technique to turn a whole friggn article like this in context for a super-prompt that also includes all the necessary bits from a git repo. The next part of this article is me using that technique to get some next-step advice from the big boys: ChatGPT, Gemini, Grok and Claude.
Okay, this is excellent. It is like we created a .py
-file in isolation of
everything else. This is for clarity of thought and deconstructing /
reconstructing the entire Pipulate process. Anyone can do this. Lay out a simple
programming challenge and get the AI assistant to give you a solution in a
simple generic Python execution context. Done!
I am tempted to go to a Jupyter Notebook at this point, but the complexity isn’t sufficient. I can jump right to altering the Pipulate “Hello World” template example into this example. It will accomplish a few critical things:
- Create the most basic example of modifying the Hello World workflow template into a new workflow.
- Alter the simple collection and recording of a single data-field from the Hello World example (name) into the collection of a throw-away data-point that gets parsed into 3 data-points that actually get recorded for that step.
- Re-populate the data-input field from
step_01
from the “dirty” URL to the canonicalized one, even though we’ve moved onto step 2. - Set the stage for step 2 by displaying org and project from step 1 knowing that it will have used those to have done an API-call, but which we didn’t get to yet.
To do this step, we are very-much no longer working in isolation. The system absolutely needs to know about Pipulate, and my question at this point is how to frame this challenge to the AIs. Do I do it in Cursor, just feeding it this article thus far, drag-copying the bits into context that I think it will need and let it churn through in Agent mode? Or do I use my context-foo process and do roughly the equivalent, showing these genius-level models out there that can take such a big token prompt and see what they do?
Well naturally to gather the best information and to use this as a sort of personal benchmark, we do the later first, and then the former. And so…
I go into context_foo.py
and I change:
FILES_TO_INCLUDE = """\
README.md
flake.nix
server.py
plugins/20_hello_workflow.py
training/hello_workflow.md
plugins/30_tasks.py
training/tasks.md
.cursorrules
""".splitlines()[:-1] # Remove the last empty line
…into:
FILES_TO_INCLUDE = """\
README.md
flake.nix
server.py
plugins/20_hello_workflow.py
training/hello_workflow.md
.cursorrules
""".splitlines()[:-1] # Remove the last empty line
…because it doesn’t really need to have an understanding of the Rails-like CRUD apps for this task. But it should understand everything else about the system, especially the Hello World workflow.
And now, I run this command:
python context_foo.py --article-mode --article-path /home/mike/repos/MikeLev.in/_posts/2025-04-19-export-botify-csv-api.md
Hello AI! Show me what you’ve got.
What The AIs See
Below is an abbreviated version of the prompt the AIs see. Of course the codebase itself is removed, but this shows you the pre-prompt and an idea of how big the overall prompt is from the token count.
--- Pre-Prompt ---
System Information:
You are about to review a codebase in preparation for implementing changes requested in an article.
Please study and understand the codebase thoroughly, as you will need this context
to provide meaningful implementation suggestions based on the article's requirements.
Key things to know about this codebase:
- It uses a hybrid approach with Nix for system dependencies and virtualenv for Python packages
- Always run `nix develop` before any commands in a new terminal
- FastHTML objects must be converted with to_xml() before returning responses
- The project is organized as a server with plugin-based workflows
Key Points:
• Focus on understanding how the codebase currently implements related functionality
• Note any existing patterns that could be leveraged for the requested changes
• Consider how the requested changes would integrate with the current architecture
--- Files Included ---
• README.md
• flake.nix
• server.py
• plugins/20_hello_workflow.py
• training/hello_workflow.md
• /home/mike/repos/MikeLev.in/_posts/2025-04-19-export-botify-csv-api.md
--- Post-Prompt ---
--- Token Summary ---
Total tokens: 49,119 tokens (≈$1.47 at GPT-4 rates)
Maximum allowed: 2,087,152 tokens (≈$62.61 at GPT-4 rates)
Remaining: 2,038,033 tokens (≈$61.14 at GPT-4 rates)
OpenAI ChatGPT o4-mini-high Shows Me What It’s Got
I start this party with the new kid on the block, OpenAI’s o4-mini-high. The first surprising thing is that it could take the entire super-prompt pasted into the prompt text-box (versus an attachment). It’s surprising because when new “high reasoning” models are introduced, they usually have smaller token windows. o4-mini-high gave the most abbreviated or summarized result.
ChatGPT o4-mini-high: Below is a roadmap for turning the Botify‑CSV export article into a full Pipulate workflow, organized into three sections—Required Changes, Integration Strategy, and Implementation Plan—and addressing each of your questions.
1. Required Changes
1.1. New Workflow Plugin
- Create a new plugin file, e.g.
plugins/30_botify_export.py
, by copying20_hello_workflow.py
and renaming:- Class →
BotifyExportFlow
APP_NAME
→"botify_export"
DISPLAY_NAME
→"Botify CSV Export"
TRAINING_PROMPT
→ point at a new markdown intraining/botify_export.md
- Class →
- Define two steps instead of the Hello “name/greeting”:
- step_01 (
done='parsed'
)- Input: raw Botify URL
- On submit: parse into
{ url, org, project }
, write all three intostate['step_01']
(see “Multiple‑field state” below)
- step_02 (
done='csv_url'
)- On render: read
state['step_01']
, call the Botify API to fetch/export CSV, record the returned CSV download URL instate['step_02']
- On render: read
- step_01 (
1.2. Parsing Utility
- Add a small helper to turn a full Botify URL into its base URL, org, and project.
- Option A: Embed directly in the plugin.
- Option B: Create
pipulate/precursors/parse_botify.py
and import.
1.3. Multi‑Field Step State
By default pip.update_step_state
only stores one key/value per step. To record three fields:
- In
step_01_submit
:# pseudocode inside your plugin parsed = parse_botify_url(user_val) state = pip.read_state(pipeline_id) state['step_01'] = { 'url': parsed['url'], 'org': parsed['org'], 'project': parsed['project'], 'parsed': parsed['url'], # satisfy the `done` field } pip.write_state(pipeline_id, state)
- Skip calling
pip.update_step_state
here in favor of manualpip.write_state
.
1.4. API Integration
- In
step_02_submit
:ctx = pip.get_step_data(pipeline_id, 'step_01', {})
- Build the Botify‑API request (e.g.
f"https://app.botify.com/api/export?org={ctx['org']}&project={ctx['project']}&...”
) - Use
aiohttp
(already in dependencies) toPOST
/GET
the CSV export. - Extract the download URL from the response JSON.
- Record it in
state['step_02']
similarly, thenpip.write_state
.
1.5. Training & UX
- Add
training/botify_export.md
describing the workflow to the user‑assistant. - Update your
ENDPOINT_MESSAGE
to explain the two‑step Botify export.
2. Integration Strategy
2.1. Follow Existing Patterns
- Routing & Registration: copy the route-registration loop in
__init__
exactly as in HelloFlow. - HTMX Rendering: return
Div
,Card
,Form
, etc., withhx_get
/hx_post
attributes as you see in20_hello_workflow.py
. - State Management: continue using
pip.initialize_if_missing
, then manualpip.write_state
only for multi‑field steps.
2.2. Conventions
- Numeric Prefix: name your file
30_botify_export.py
so it appears after20_hello_workflow.py
. - FastHTML Rendering: always return FT objects, then let the server do
to_xml(...)
under the hood. - List of Pipeline IDs: use
pip.update_datalist("pipeline-ids", options=...)
to keep “resume” dropdown in sync.
2.3. Backward Compatibility
- Isolate all Botify logic in its own plugin—no core changes.
- Don’t modify
server.py
or the globalPipulate
class. - Ensure your new plugin gracefully handles missing or malformed URLs (validation).
3. Implementation Plan
Milestone | Details & Checks |
---|---|
1. Scaffolding | • Copy HelloFlow → 30_botify_export.py • Create training prompt markdown |
2. Parsing Utility | • Embed or import parse_botify_url() • Write and unit‑test locally |
3. Step 1: URL Parsing | • Define step_01 & step_01_submit • Manual state write for 3 fields • Validate with bad / good URLs |
4. Step 2: CSV Export Call | • Implement step_02 & step_02_submit • Use aiohttp to call Botify’s CSV API • Record and surface download link |
5. UI & Messaging | • Update ENDPOINT_MESSAGE and step_messages • Enqueue user guidance via self.message_queue |
6. End‑to‑End Testing | • In the browser, launch workflow, input URL, see parse, see CSV link • Test “resume” with dropdown |
7. Documentation & Training | • Finish training/botify_export.md • Review README to mention new workflow |
8. Merge & Release | • Ensure no regressions in existing workflows • Tag and ship a patch release |
Potential Challenges & Mitigations
- Multi‑field state: bypass helper for step 1—manually manage JSON blob.
- Botify API quirks: may require authentication/token; consider adding a credentials table or prompting the user.
- HTMX race conditions: ensure that your
hx_target
IDs match the container’sid
exactly.
By isolating all Botify logic in its own workflow plugin and reusing the existing FastHTML + HTMX patterns, you’ll keep the core architecture untouched, preserve backward compatibility, and deliver a clear, two‑step CSV export experience.
Gemini 2.5 Experimental Shows Me What It’s Got
Gemini used to have the distinction of being the only model able to take a prompt this large in its text input field for prompts. All the other models used to require it be added as an attachment. Both o4-mini-high and Grok 3 now joins the ranks of Gemini in this regard. I like Gemini’s very detailed reply, replete with code examples! Gemini 2.5 just continues to knock my socks off.
Gemini 2.5: Okay, I have thoroughly reviewed the codebase (README.md
, flake.nix
, server.py
, plugins/20_hello_workflow.py
, training/hello_workflow.md
) and the article (2025-04-19-export-botify-csv-api.md
).
The core requirement from the article is to implement the first step of a new Pipulate workflow named “Botify CSV Export”. This first step involves:
- Accepting a potentially messy Botify project URL as input.
- Parsing this URL to extract the organization slug (
org
) and project slug (project
). - Canonicalizing the URL to the base project URL (
https://app.botify.com/{org}/{project}/
). - Storing these three pieces of data (
url
,org
,project
) in the state for the first step (step_01
). - Using the canonical URL (
url
) as the field that marksstep_01
as “done”. - Refilling the input field with the canonical URL upon reverting or re-displaying the step.
- Setting the stage for a subsequent step (
step_02
) which would display the extractedorg
andproject
(though the API interaction for step 2 is not part of this initial implementation request).
The article explicitly suggests using plugins/20_hello_workflow.py
as a template and provides the core Python logic for parsing the URL in an isolated precursors/temp.py
example.
Here’s the implementation plan based on your codebase and the article’s requirements:
1. Required Changes
- New Workflow File:
- Copy
plugins/20_hello_workflow.py
toplugins/30_botify_export.py
(or another appropriate number reflecting menu order). - Rename the class inside from
HelloFlow
toBotifyExportFlow
.
- Copy
- Update Class Constants:
- Change
APP_NAME
to"botify_export"
. - Change
DISPLAY_NAME
to"Botify CSV Export"
. - Update
ENDPOINT_MESSAGE
to something like:"Enter a Botify project URL to extract project details and prepare for CSV export."
- Set
TRAINING_PROMPT
to"botify_export_flow.md"
(This file will need to be created later).
- Change
- Modify
__init__
:- Step Definition: Update the
steps
list definition:- Step 1: Redefine the first
Step
tuple:Step( id='step_01', done='url', # Marks step completion, stores canonical URL show='Botify Project URL', # UI Label refill=True, # Refill canonical URL on revert transform=None # Step 1 doesn't transform input from a prior step ),
- Step 2: Redefine the second
Step
tuple (for now, just confirming the parse):Step( id='step_02', done='confirmed', # Marks confirmation of parsed data show='Project Confirmation', # UI Label refill=False, # No input to refill here transform=None # Doesn't transform input, just uses step_01's data ),
- Remove any subsequent steps from the original HelloFlow template for now, ensuring the
finalize
step remains last.
- Step 1: Redefine the first
- Step Messages: Update
self.step_messages
forstep_01
andstep_02
to reflect the new purpose (e.g., asking for URL, confirming parsed details).
- Step Definition: Update the
- Implement URL Parsing:
- Add the
parse_botify_url
function (provided in the article’sprecursors/temp.py
example) as a method within theBotifyExportFlow
class (e.g.,_parse_botify_url
). This aligns with the WET workflow philosophy of keeping logic contained. - Add
from urllib.parse import urlparse
to the imports.
- Add the
- Modify
step_01()
(GET Request Handler):- Change the input field’s
placeholder
to indicate Botify URL input (e.g., “Enter any Botify Project URL”). - Ensure the input field’s
name
attribute is distinct (e.g.,"raw_url"
or"botify_url_input"
) as we’ll store the canonical URL under thestep.done
name ('url'
). - The
value
attribute should be populated withdisplay_value
, which usesstep_data.get(step.done, "")
- this correctly fetches the canonical URL if refilling. - Adjust the
H4
title (e.g., “Step 1: Enter Botify Project URL”).
- Change the input field’s
- Modify
step_01_submit()
(POST Request Handler):- Get the raw URL from the form using the new input name (e.g.,
raw_url = form.get("raw_url", "")
). - Call the
_parse_botify_url
method withraw_url
. - Validation: Check if parsing was successful. If
_parse_botify_url
raises aValueError
or returns None/error indicator, display an error message usingP(error_msg, style=pip.get_style("error"))
and return early. - State Update:
- If parsing is successful, get the
result
dictionary ({'url': ..., 'org': ..., 'project': ...}
). - Use
processed_val = result['url']
(the canonical URL). - Call
await pip.update_step_state(pipeline_id, step_id, processed_val, steps)
to store the canonical URL under thestep.done
key ('url'
), marking the step as complete. - Immediately after, fetch the current state, update the
step_01
dictionary to include theorg
andproject
, and write it back:state = pip.read_state(pipeline_id) if step_id not in state: state[step_id] = {} # Should exist after update_step_state, but safety check state[step_id]['org'] = result['org'] state[step_id]['project'] = result['project'] # Ensure canonical url is also there if update_step_state didn't add it somehow state[step_id]['url'] = result['url'] pip.write_state(pipeline_id, state)
- If parsing is successful, get the
- Feedback: Update the confirmation message sent via
self.message_queue.add
to show the canonical URL. - Ensure
pip.create_step_navigation
receives the canonical URL (processed_val
) so the revert control displays it correctly.
- Get the raw URL from the form using the new input name (e.g.,
- Modify
step_02()
(GET Request Handler):- Read the state for
step_01
:step1_data = pip.get_step_data(pipeline_id, 'step_01', {})
. - Extract
org = step1_data.get('org', 'N/A')
andproject = step1_data.get('project', 'N/A')
. - Display these values in a
Card
or similar element. - Provide a simple confirmation button (e.g., “Confirm Project Details & Continue”) that triggers
step_02_submit
. Example:return Div( Card( H4(f"{pip.fmt(step.id)}: Confirm Project Details"), P(f"Organization: {org}"), P(f"Project: {project}"), Form( Button("Confirm & Continue", type="submit", cls="primary"), hx_post=f"/{app_name}/{step.id}_submit", hx_target=f"#{step.id}" ) ), Div(id=next_step_id), # Placeholder for next step trigger id=step.id )
- Read the state for
- Modify
step_02_submit()
(POST Request Handler):- This step mainly serves as a confirmation gate.
- Update the state for
step_02
:await pip.update_step_state(pipeline_id, step_id, True, steps)
(usingTrue
for thestep.done
field ‘confirmed’). - Add a confirmation message to the chat queue.
- Return the standard navigation controls using
pip.create_step_navigation
.
- New Training File:
- Create
training/botify_export_flow.md
. Copy content fromtraining/hello_workflow.md
and adapt it to explain the Botify export workflow, focusing initially on the URL parsing step. Remember the “DOLPHIN” check for the LLM.
- Create
2. Integration Strategy
- Follow Existing Patterns: Adhere strictly to the workflow pattern established in
20_hello_workflow.py
: separate GET handlers (step_xx
) for displaying UI and POST handlers (step_xx_submit
) for processing input and updating state. - Use
Pipulate
Helpers: Leverage thepipulate
instance methods (initialize_if_missing
,read_state
,write_state
,revert_control
,validate_step_input
,update_step_state
,check_finalize_needed
,create_step_navigation
,rebuild
,run_all_cells
,get_step_data
,clear_steps_from
,finalize_workflow
,unfinalize_workflow
,stream
viamessage_queue.add
) wherever applicable. Modify behavior within the step handlers if a helper doesn’t quite fit (like storing multiple fields instep_01_submit
), rather than changing the core helpers themselves. - State Management: Store step-specific data within nested dictionaries under the step’s ID key (e.g.,
state['step_01'] = {'url': ..., 'org': ..., 'project': ...}
). Ensure the field named inStep.done
exists within that dictionary to signify completion. - UI Consistency: Use
Card
,Form
,Input
,Button
,P
,H4
, etc., consistent with20_hello_workflow.py
andserver.py
(e.g.,pip.wrap_with_inline_button
). - HTMX: Ensure all
hx-*
attributes correctly target elements using the newapp_name
(botify_export
) and step IDs. - Server Integration: The plugin discovery mechanism in
server.py
will automatically pick up the new30_botify_export.py
file (as long as it doesn’t start withxx_
or contain parentheses) and add it to the “APP” dropdown menu. - Backward Compatibility: Not applicable as this is a new, independent workflow.
3. Implementation Plan
- Copy & Rename: Duplicate
plugins/20_hello_workflow.py
toplugins/30_botify_export.py
. Rename the class toBotifyExportFlow
. - Update Constants: Modify
APP_NAME
,DISPLAY_NAME
,ENDPOINT_MESSAGE
,TRAINING_PROMPT
in the new class. - Implement Parser: Add the
_parse_botify_url
method to the class and necessary imports (urllib.parse
). - Modify
__init__
: Redefine thesteps
list forstep_01
andstep_02
as described above. Updatestep_messages
. - Implement Step 1: Modify
step_01()
(GET) andstep_01_submit()
(POST) to handle Botify URL input, parsing, validation, state storage (includingorg
,project
), and UI feedback (displaying canonical URL). - Implement Step 2: Modify
step_02()
(GET) to display parsedorg
andproject
and provide a confirmation button. Modifystep_02_submit()
(POST) to handle the confirmation and update state. - Create Training File: Create
training/botify_export_flow.md
with initial content explaining the workflow’s purpose and the first step. - Testing:
- Restart the server (
python server.py
). - Access the new “Botify CSV Export” app from the menu.
- Test submitting valid and invalid Botify URLs in Step 1.
- Verify that the correct canonical URL, org, and project are stored in the
pipeline
table’s JSON blob for the correspondingpkey
. - Verify Step 2 displays the correct
org
andproject
. - Test the revert functionality for Step 1 – ensure the input field is refilled with the canonical URL.
- Test finalizing the workflow (it should proceed directly from Step 2 confirmation).
- Check chat messages for correctness.
- Restart the server (
- Refinement: Adjust UI elements, messages, and error handling based on testing.
Potential Challenges/Risks:
- State Management Complexity: Ensuring the state for
step_01
correctly storesurl
,org
, andproject
while still usingurl
to mark completion requires careful handling withinstep_01_submit
. Double-check that reading/reverting works as expected. - URL Parsing Edge Cases: The provided
parse_botify_url
seems robust, but real-world Botify URLs might have variations. Thorough testing with different URL formats is needed. - HTMX Targeting: Errors in
hx-target
or element IDs can lead to UI updates failing silently or applying incorrectly. Careful inspection during testing is necessary. - WET Code Maintenance: While intentional, ensure the duplicated UI logic in GET/POST handlers remains manageable and consistent within this specific workflow file.
Grok 3 Shows Me What It’s Got
Grok 3 used to demand that I use a file upload, but now is able to take the super-prompt directly in its text-input field just like Gemini. This has the extra advantage of not having to provide extra “outer” meta-prompt instructions. The models are being tested on simply knowing what to do with a super-prompt like this, and Grok does! And Grok’s responses are right up there with Gemini’s replete with code examples and specifics.
Grok 3:
Implementation Suggestions for Article Requirements
Based on the requirements outlined in the article “Export a Botify CSV With API,” here are specific suggestions for implementing the requested changes within the Pipulate framework. The goal is to adapt the “Hello World” workflow template to create a new workflow for parsing Botify project URLs and exporting CSV data via the Botify API, while preserving the codebase’s integrity and adhering to existing patterns.
Required Changes
What specific code changes are needed to implement the article’s requirements?
To meet the article’s requirements, a new workflow must be created to parse Botify URLs, store the parsed data, and prepare for subsequent API-driven CSV export steps. Here are the specific changes:
- Create a New Workflow File:
- File:
plugins/30_botify_csv_export.py
- Copy the structure from
plugins/20_hello_workflow.py
and modify it to suit the Botify CSV export task.
- File:
- Update Class and Constants:
- Rename the class to
BotifyCSVExportFlow
. - Set
APP_NAME = "botify_csv_export"
(unique identifier). - Set
DISPLAY_NAME = "Botify CSV Export"
(user-facing name). - Update
ENDPOINT_MESSAGE
to:"This workflow exports CSV data from Botify. Enter a project URL to begin."
- Set
TRAINING_PROMPT = "botify_csv_export.md"
(to be created intraining/
with workflow-specific guidance).
- Rename the class to
- Define Workflow Steps:
- Replace the steps in the
__init__
method with: - Replace the steps in the
__init__
method with:steps = [ Step(id='step_01', done='url', show='Botify Project URL', refill=True), Step(id='step_02', done='csv_data', show='CSV Export', refill=False), Step(id='finalize', done='finalized', show='Finalize', refill=False) ]
steps = [ Step(id='step_01', done='url', show='Botify Project URL', refill=True), Step(id='step_02', done='csv_data', show='CSV Export', refill=False), Step(id='finalize', done='finalized', show='Finalize', refill=False) ]
step_01
: Collects and parses the Botify URL, storingurl
,org
, andproject
.step_02
: Placeholder for future API call and CSV export logic.finalize
: Locks the workflow as per the standard pattern.
- Replace the steps in the
- Add URL Parsing Function:
- Include the
parse_botify_url
function fromprecursors/temp.py
in the new file:from urllib.parse import urlparse def parse_botify_url(url: str) -> dict: parsed = urlparse(url) path_parts = parsed.path.strip('/').split('/') if len(path_parts) < 2: raise ValueError("Invalid Botify URL format") org = path_parts[0] project = path_parts[1] base_url = f"https://{parsed.netloc}/{org}/{project}/" return {"url": base_url, "org": org, "project": project}
- Include the
- Modify
step_01
Method:- Update to display a form requesting a Botify project URL.
- Pre-fill the input field with the canonicalized URL if available from state:
async def step_01(self, request): pip, db, steps, app_name = self.pipulate, self.db, self.steps, self.app_name step_id = "step_01" step_index = self.steps_indices[step_id] step = steps[step_index] next_step_id = steps[step_index + 1].id if step_index < len(steps) - 1 else None pipeline_id = db.get("pipeline_id", "unknown") state = pip.read_state(pipeline_id) step_data = pip.get_step_data(pipeline_id, step_id, {}) user_val = step_data.get(step.done, "") if "finalize" in state and "finalized" in state["finalize"]: return Div(Card(f"🔒 {step.show}: {user_val}"), Div(id=next_step_id, hx_get=f"/{app_name}/{next_step_id}", hx_trigger="load")) if user_val and state.get("_revert_target") != step_id: return Div( pip.revert_control(step_id=step_id, app_name=app_name, message=f"{step.show}: {user_val}", steps=steps), Div(id=next_step_id, hx_get=f"/{app_name}/{next_step_id}", hx_trigger="load") ) else: display_value = user_val if (step.refill and user_val and self.PRESERVE_REFILL) else "" await self.message_queue.add(pip, self.step_messages[step_id]["input"], verbatim=True) return Div( Card( H4(f"{pip.fmt(step.id)}: Enter {step.show}"), Form( pip.wrap_with_inline_button( Input(type="text", name=step.done, value=display_value, placeholder="e.g., https://app.botify.com/org/project/", required=True, autofocus=True) ), hx_post=f"/{app_name}/{step.id}_submit", hx_target=f"#{step.id}" ) ), Div(id=next_step_id), id=step.id )
- Modify
step_01_submit
Method:- Parse the URL, store the dictionary, and re-populate the input with the canonicalized URL:
async def step_01_submit(self, request): pip, db, steps, app_name = self.pipulate, self.db, self.steps, self.app_name step_id = "step_01" step_index = self.steps_indices[step_id] step = steps[step_index] pipeline_id = db.get("pipeline_id", "unknown") form = await request.form() user_val = form.get(step.done, "") is_valid, error_msg, error_component = pip.validate_step_input(user_val, step.show) if not is_valid: return error_component try: parsed_data = self.parse_botify_url(user_val) processed_val = parsed_data["url"] await pip.update_step_state(pipeline_id, step_id, parsed_data, steps, clear_previous=False) await self.message_queue.add(pip, f"{step.show}: {processed_val}", verbatim=True) if pip.check_finalize_needed(step_index, steps): await self.message_queue.add(pip, "URL parsed. Proceed to export CSV or finalize.", verbatim=True) return pip.create_step_navigation(step_id, step_index, steps, app_name, processed_val) except ValueError as e: return P(f"Error: {str(e)}", style=pip.get_style("error"))
- Parse the URL, store the dictionary, and re-populate the input with the canonicalized URL:
- Update
step_02
Method (Placeholder):- Display parsed data as a preview:
async def step_02(self, request): pip, db, steps, app_name = self.pipulate, self.db, self.steps, self.app_name step_id = "step_02" step_index = self.steps_indices[step_id] step = steps[step_index] next_step_id = steps[step_index + 1].id if step_index < len(steps) - 1 else None pipeline_id = db.get("pipeline_id", "unknown") state = pip.read_state(pipeline_id) step_data = pip.get_step_data(pipeline_id, step_id, {}) user_val = step_data.get(step.done, "") if "finalize" in state and "finalized" in state["finalize"]: return Div(Card(f"🔒 {step.show}: {user_val}"), Div(id=next_step_id, hx_get=f"/{app_name}/{next_step_id}", hx_trigger="load")) step_01_data = pip.get_step_data(pipeline_id, "step_01", {}) if step_01_data: await self.message_queue.add(pip, f"Org: {step_01_data['org']}, Project: {step_01_data['project']}", verbatim=True) return Div(Card(P("CSV export placeholder")), id=step.id)
- Display parsed data as a preview:
Which existing components need to be modified or extended?
- Pipulate Class (server.py): No direct modifications are needed as it already supports storing dictionaries per step. The existing state management logic handles this use case.
- HelloFlow (20_hello_workflow.py): Used as a template; no changes required to the original file.
- No Other Core Components: The plugin system and UI rendering (HTMX, FastHTML) remain unchanged.
What new components or patterns need to be introduced?
- New Workflow Class:
BotifyCSVExportFlow
as a standalone plugin. - URL Parsing Utility:
parse_botify_url
function, initially embedded in the workflow file (can be refactored to a utilities module later). - No New Patterns: The existing step-based workflow pattern with state management and HTMX-driven UI updates is sufficient.
Integration Strategy
How should these changes be integrated with existing functionality?
- Plugin System: Place the new file in
plugins/
with a numeric prefix (e.g.,30_
) to ensure discovery byserver.py
’sdiscover_plugin_files
function. The workflow will auto-register and appear in the navigation menu. - State Management: Leverage
Pipulate
’s existing JSON blob storage to save{"url": ..., "org": ..., "project": ...}
understep_01
. - UI Updates: Use HTMX for dynamic updates, consistent with the codebase’s server-rendered approach.
What existing patterns or conventions should be followed?
- Step Pattern: Use
Step
namedtuples and pairedstep_xx
/step_xx_submit
methods. - FastHTML Rendering: Return components like
Div
,Card
, andForm
withto_xml()
for HTTP responses. - HTMX Integration: Use
hx_post
,hx_target
, andhx_swap
attributes for seamless UI updates. - State Storage: Store data in the
pipeline
table’s JSON blob viaPipulate
helpers. - Nix Environment: Ensure any new dependencies (none currently) are added to
flake.nix
orrequirements.txt
.
How can we ensure backward compatibility?
- Independent Workflow: The new workflow operates as a standalone plugin, not affecting existing workflows or CRUD apps.
- No Core Changes: Avoid modifying
server.py
or other core files, relying on the plugin architecture. - Consistent Patterns: Adhere to existing conventions, ensuring the new workflow integrates seamlessly without breaking other functionality.
Implementation Plan
What’s the recommended order for implementing these changes?
- Create the Workflow File:
- Copy
20_hello_workflow.py
to30_botify_csv_export.py
. - Update class name, constants, and steps as outlined.
- Copy
- Implement URL Parsing:
- Add
parse_botify_url
to the new file with error handling.
- Add
- Modify
step_01
:- Update UI to request a Botify URL and pre-fill with canonicalized URL from state.
- Implement
step_01_submit
:- Parse the URL, store the dictionary, and handle errors.
- Add
step_02
Placeholder:- Display parsed data from
step_01
as a preview.
- Display parsed data from
- Test the Workflow:
- Verify parsing, state storage, and UI behavior (including revert functionality).
What are the key milestones or checkpoints?
- Milestone 1: URL parsing and storage working in
step_01
. - Milestone 2: Canonicalized URL re-populates correctly on revert.
- Milestone 3:
step_02
displays parsed data, setting the stage for API integration.
What potential challenges or risks need to be addressed?
- Error Handling: Invalid URLs must be caught and displayed gracefully to the user.
- State Management: Ensure the dictionary is stored and retrieved correctly from the JSON blob.
- UI Consistency: HTMX updates must align with the existing look and feel (e.g., PicoCSS styling).
- Future API Integration:
step_02
is a placeholder; plan for Botify API credentials and polling logic later.
By following this approach, the new BotifyCSVExportFlow
workflow will integrate seamlessly into Pipulate, meeting the article’s requirements while maintaining the framework’s architectural integrity and leveraging its established patterns.
Anthropic Claude 3.7 Built Into Cursor
I thought the o4-mini-high response wash short. Claude 3.7 Sonnet just gave me an outline! In all fairness I did this prompt through Cursor chat. When I tried trough the Claude.AI site to keep the comparison apples-to-apples, it reported that the prompt is 86% over the length-limit. So I was forced to do the Claude example through Cursor where I pay my $20/mo. I suspect the in-editor context may be responsible for the brevity considering it may figure it will be involved in the implementation — whereas the others may figure their responses had to be 1-shot.
Claude 3.7 Sonnet:
Implementation Plan for Botify CSV Export Workflow
Required Changes
- Create a new workflow plugin file based on
hello_workflow.py
:- Rename to
botify_export.py
with increased numbering (e.g.,25_botify_export.py
) - Modify class name, APP_NAME, DISPLAY_NAME, and ENDPOINT_MESSAGE
- Create training file
botify_export.md
- Rename to
- Implement URL parsing step:
- Modify
step_01
to accept and validate Botify URLs - Store parsed URL, org, and project slugs in state
- Modify
- Implement API authentication step:
- Add a step for API key input
- Add secure handling of API credentials
- Implement CSV data export functionality:
- Add step for selecting analysis data
- Implement API calls to Botify
- Handle pagination and data processing
- Store CSV data for download
Integration Strategy
- Follow existing workflow patterns in
hello_workflow.py
:- Maintain the Step namedtuple structure
- Keep the same route registration approach
- Use the Pipulate helpers for state management
- Leverage NixOS environment for dependencies:
- Use the Nix shell for consistent execution environment
- Add required Python packages to flake.nix if needed
- Add appropriate error handling:
- Validate input at each step
- Provide clear error messages for API failures
- Implement graceful fallbacks
Implementation Steps
- Create the basic workflow structure
- Implement URL parsing functionality
- Add API authentication step
- Implement data export functionality
- Add download/save functionality
- Add comprehensive documentation
Let me know if you’d like me to proceed with implementing these changes or if you need clarification on any part of the plan.
This Is The Hump That I Have To Get Over
I now have the plans as set forth by FOUR DIFFERENT AIs regarding my next steps.
I find it easier to go ask all of them for plans than it is to actually dig-in
and do the work. But we have 4 relatively smart things coming to quite a
consensus that I can keep referring to now for next steps. The first one is
obvious. I copy/paste 20_hello_workflow.py
to 50_botify_export.py
. And then
I change the stuff at the top of the main class from:
class HelloFlow: # <-- CHANGE THIS to your new WorkFlow name
APP_NAME = "hello" # <-- CHANGE THIS to something no other workflow is using
DISPLAY_NAME = "Hello World" # <-- CHANGE THIS to value for User Interface
ENDPOINT_MESSAGE = ( # <-- Shows when user switches to workflow landing page
"This simple workflow demonstrates a basic Hello World example. "
"Enter an ID to start or resume your workflow."
)
TRAINING_PROMPT = "hello_workflow.md" # markdown file from /training or plain text
…to:
class BotifyExport:
APP_NAME = "botfify_csv"
DISPLAY_NAME = "Botify CSV Export"
ENDPOINT_MESSAGE = (
"This workflow helps you export data from Botify projects. "
"Press Enter to start a new workflow or enter an existing key to resume. "
)
TRAINING_PROMPT = "botify_export.md"
Wow, changing the constants is the super-duper-easy part. And that’s done. And I
am duly impressed with the auto plugin registration magic of my own system. Wow,
seeing Botify CSV Export
just come alive on the dropdown menu as a fully
working clone of the Hello World
plugin is alone amazing. However, the next
steps are critical. This is where Pipulate is really born, because navigating
this step determines how workable slamming out new workflow instances from
existing templates really is. It’s either too-high-friction even for its creator
and the AI assistants helping me. Or it’s navigable, relatively easy in the
first place, and forever easier over time.
There will be kooky customization. This is why Workflows are WET (write
everything twice / we enjoy typing). The sort of default behavior overrides I am
about to embark on are mind-numbing to me when I am overriding behaviors
inherited from object oriented programming super-classes. There are no
super-classes here, and so what you see is what you get. I say that because
something that is normally a single key/value-pair is about to change. I look
through the recommendations from the different AIs to see it acknowledging this
issue to see what each says about update_step_state
.
Only ChatGPT o4-mini-high calls it out explicitly for what it is:
By default
pip.update_step_state
only stores one key/value per step. To record three fields…
Gemini doesn’t call it out so blatantly, but does spot it and provide a solution:
Immediately after, fetch the current state, update the
step_01
dictionary to include theorg
andproject
, and write it back…
Perhaps Grok 3’s answer is the most interesting. While it doesn’t call it out like o4-mini-high, nor provide a less-than optimally elegant call-back approach like Gemini, Grok proactively writes a function and puts it right up-front in its reply:
Include the
parse_botify_url
function fromprecursors/temp.py
in the new file (calledparsed_data
)…
…and then it uses that function’s returned value in-location for a 1-shot update:
await pip.update_step_state(pipeline_id, step_id, parsed_data, steps, clear_previous=False)
Now this is not to say that Grok 3’s response was objectively best. ChatGPT explicitly called out the issue, and Gemini also uses the same parsing function as Grok to achieve much the same thing. It’s just the way Grok 3 says it. It’s so explicit (says someone who constantly meanders). I can follow the instructions. So, let’s do it.
First, we add:
from urllib.parse import urlparse
…to the top of 50_botify_export.py
. Then I put in the function:
def parse_botify_url(url: str) -> dict:
parsed = urlparse(url)
path_parts = parsed.path.strip('/').split('/')
if len(path_parts) < 2:
raise ValueError("Invalid Botify URL format")
org = path_parts[0]
project = path_parts[1]
base_url = f"https://{parsed.netloc}/{org}/{project}/"
return {"url": base_url, "org": org, "project": project}
Alright, now we need to change the form language and field names. Grok 3 tells me to:
- Replace the steps in the
__init__
method with: ```python steps = [ Step(id=’step_01’, done=’url’, show=’Botify Project URL’, refill=True), Step(id=’step_02’, done=’csv_data’, show=’CSV Export’, refill=False), Step(id=’finalize’, done=’finalized’, show=’Finalize’, refill=False) ]
…which I do, however it misses that the finalize step is always added later automatically, so it got that a bit wrong. So I modify the instruction accordingly.
The next steps become very meticulous and detailed to an degree that it should be in the sweet spot for AI code assistants to help with. I try going step-by-step manually, but I get it wrong. So I feed the entire article thus far with all the different AI recommendations and all to Cursor AI defualt. It nails it. I ask it to break down how and why:
Cursor AI: The key insight came from understanding the article’s explanation of how Pipulate’s workflow state is structured, particularly this part:
While
step_01
only has{"name": "Mike"}
, there’s no reason it couldn’t have:{"url": "https://app.botify.com/example-org/example.com/", "org": "example-org", "project": "example.com"}
This showed that while the Step
namedtuple’s done
field typically stores a single value, the actual state storage in the JSON blob can hold multiple related fields under the same step.
The solution I implemented:
- Uses
parse_botify_url
to extract all components (url, org, project) from any valid Botify URL - Instead of just storing the canonical URL, it stores the entire parsed dictionary using
state[step_id].update(parsed_data)
- Ensures the
done
field (step.done = 'url'
) still gets set correctly withstate[step_id][step.done] = parsed_data['url']
- Uses
pip.write_state()
to save everything at once rather than usingpip.update_step_state()
This approach:
- Maintains the workflow’s step completion logic (via the
done
field) - Stores all parsed components for use in later steps
- Automatically handles URL canonicalization
- Preserves the WET (Write Everything Twice) philosophy by keeping the logic self-contained
The database record you shared shows it working perfectly - all three components are stored under step_01
, while still maintaining the overall workflow state structure with created
, updated
, and finalize
fields.
This solution aligns with the article’s emphasis on keeping things simple and observable, while still being flexible enough to store the additional data needed for the Botify CSV export workflow.
Okay, so all that for step 1 complete: canonicalized Botify URLs and their
accompanying org and project slugs! Haha! Alright wow, so let’s forge on ahead.
The next part is going to be true API integration. There is a file stored on the
drive relative to where the code is run from called botify_token.txt
. Don’t
get hung up on security, this is a local-first single-tenant app running a lot
like how an Electron app would. And this file was plopped in place from a prior
step, and you can rely on it being there. It follows this format:
****************************************
# username: first.last
…where the asterisks stand for the actual characters of the API token. And to actually download a CSV from Botfiy, we’re going to need one more piece of information, and that’s going to be their most recent crawl analysis slug which can be gotten with a function such as this (httpx and aiohttp if you prefer are already pip installed and available in the environment):
def fetch_analyses(org, project):
"""Fetch analysis slugs for a given project from Botify API."""
url = f"https://api.botify.com/v1/analyses/{org}/{project}/light"
headers = {"Authorization": f"Token {api_key}"}
slugs = []
try:
while url:
response = httpx.get(url, headers=headers)
response.raise_for_status()
data = response.json()
slugs.extend(a['slug'] for a in data.get('results', []))
url = data.get('next')
return slugs
except httpx.RequestError as e:
print(f"Error fetching analyses: {e}")
return []
…with sample output:
20240301
20240201
20240101-2
20240101
20231201
20231101
As our next step, I would like to simply replace Step 2’s “Enter CSV Export” with the newest item from the returned list.
That last prompt caused the AI to do the API-call effectively, but having acquired the analysis slug value, skip over step 2. So the next prompt:
Me: Very close, but I had to reject it because you didn’t present step 2 as a normal text field. You sort of filled it in and cascaded over it. The idea is that we’re setting the stage for creating a dropdown menu of all the available analysis slugs with the most recent one or previously selected one pre-selected on a dropdown menu. But I want to do each step progressively, keeping the web input fields in place. So instead of effectively automating and skipping over the step, simply pre-fill the value in the form field as it currently exists in this now reverted state. Don’t worry about the dropdown yet. We’ll get to that next.
Cursor AI: [Nails it]
Me: Okay, this is most excellent. You really nailed it. Now please summarize the difference between this file @20_hello_workflow.py and this file @50_botify_export.py. In other words, write a little article talking about how we have customized the template to be this custom workflow (so far).
Cursor AI: Here’s an analysis of how we transformed the Hello World template into the Botify CSV Export workflow:
From Hello World to Botify CSV Export: Anatomy of a Pipulate Workflow Customization
Core Changes
- Basic Configuration
- Renamed the class from
HelloFlow
toBotifyExport
- Changed
APP_NAME
from “hello” to “botify_csv” - Updated
DISPLAY_NAME
from “Hello World” to “Botify CSV Export” - Modified
ENDPOINT_MESSAGE
to describe the Botify-specific workflow purpose - Changed
TRAINING_PROMPT
to point to “botify_export.md”
- Renamed the class from
- Added Dependencies
- Imported
httpx
for API calls - Added
Path
from pathlib for file handling - Kept the original imports but added
urlparse
from urllib.parse
- Imported
- URL Parsing Functionality
- Added the
parse_botify_url
function to extract organization and project slugs - Added API-related helper methods
read_api_token
andfetch_analyses
- Added the
- Step Definition Changes
# Hello World steps: Step(id='step_01', done='name', show='Your Name', refill=True) Step(id='step_02', done='greeting', show='Hello Message', refill=False) # Botify Export steps: Step(id='step_01', done='url', show='Botify Project URL', refill=True) Step(id='step_02', done='csv_data', show='CSV Export', refill=False)
Key Workflow Differences
Step 1: Enhanced State Management
- Hello World: Stores a single value (
name
) in the state - Botify Export: Stores multiple related values in the state:
"step_01": { "url": "https://app.botify.com/org/project/", "org": "org", "project": "project" }
Step 2: API Integration
- Hello World: Simple transform of step 1’s value using an f-string
- Botify Export:
- Reads API token from file
- Makes authenticated API calls to Botify
- Pre-fills form with the most recent analysis slug
- Maintains error handling for API calls
State Management Evolution
- Hello World: Uses
pip.update_step_state
for simple key-value storage - Botify Export: Uses direct state manipulation with
pip.read_state
andpip.write_state
to store multiple values while maintaining thedone
field convention
Preserved Patterns
- Form Structure
- Both maintain the same HTMX-powered form pattern
- Keep the standard input field UI components
- Use the same revert/finalize mechanisms
- Message System
- Both use the message queue for user feedback
- Maintain the same step completion messages pattern
- Navigation Flow
- Preserve the step-by-step progression
- Keep the same revert functionality
- Maintain the finalization process
Key Innovations
- Multi-field State
- Demonstrated how a single step can store multiple related data points
- Showed how to maintain the
done
field convention while storing additional data
- API Integration
- Added API token handling
- Implemented async HTTP calls
- Added error handling for external services
- Value Pre-filling
- Enhanced the display value logic to show API-fetched data
- Maintained the ability to modify pre-filled values
This customization shows how Pipulate’s “WET” philosophy allows for significant workflow customization while maintaining a consistent user experience and predictable development pattern. The template provides a solid foundation that can be extended without breaking the core workflow mechanics.
Adding Dropdown Menu For Analysis
All this work I’ve done before. More than once, in fact. But I want each subsequent version to be the cleanest implementation yet, so I am always hesitant to bring the code from the old implementation into the picture. I don’t want the AIs to become too lazy transposing code verbatim. But it really does capture a lot of important nuance, like with this next dropdown menu step.
If a value has not been set for the Analysis field yet, it should automatically select the most recent item in the list. However, if one has been selected before, and we are for example reverting back to that step, the dropdown menu should be pre-selected with the previously selected value. Makes sense, right?
Now this is part of why Workflows are WET. The kind of things that might be done here with dropdown menus to set defaults or to display one value and to use a different value on the submit is without end. So there is this whole wheel spinning exercise of inventing your own APIs and syntax for these things, base classes for every field type and the like. But with the WET approach, we just do it right here with Python, FastHTML and HTMX. What you see is what you get. It’s explicit. No mysterious inheritance and overrides. Just build it.
For step 2, the csv_data field should be called analysis. And instead of a text field, it should be a PicoCSS style dropdown menu, which I believe technically is a select with options field. The options should be all the analysis choices from the list, preserving their original order, the newest (first) selected if there has not been a choice made already. On revert to step 2 button, it should be pre-selected with whatever the last selected analysis was.
Cursor AI: I’ll help modify step 2 to use a dropdown menu for analysis selection. First, let’s update the step definition to use analysis
instead of csv_data
, and then implement the dropdown with proper selection handling:
# pipulate/plugins/50_botify_export.py
# In __init__, update the steps definition:
steps = [
Step(id='step_01', done='url', show='Botify Project URL', refill=True),
Step(id='step_02', done='analysis', show='Analysis Selection', refill=True), # Changed from csv_data
]
# Then update step_02:
async def step_02(self, request):
"""
Display form for analysis slug selection using a dropdown menu.
Pre-selects the most recent analysis if no previous selection exists,
otherwise maintains the user's previous selection.
"""
pip, db, steps, app_name = self.pipulate, self.db, self.steps, self.app_name
step_id = "step_02"
step_index = self.steps_indices[step_id]
step = steps[step_index]
next_step_id = steps[step_index + 1].id if step_index < len(steps) - 1 else None
pipeline_id = db.get("pipeline_id", "unknown")
state = pip.read_state(pipeline_id)
step_data = pip.get_step_data(pipeline_id, step_id, {})
user_val = step_data.get(step.done, "") # Previously selected analysis, if any
# Handle finalized state
finalize_data = pip.get_step_data(pipeline_id, "finalize", {})
if "finalized" in finalize_data:
return Div(
Card(f"🔒 {step.show}: {user_val}"),
Div(id=next_step_id, hx_get=f"/{self.app_name}/{next_step_id}", hx_trigger="load")
)
# If step is complete and not being reverted, show revert control
if user_val and state.get("_revert_target") != step_id:
return Div(
pip.revert_control(step_id=step_id, app_name=app_name, message=f"{step.show}: {user_val}", steps=steps),
Div(id=next_step_id, hx_get=f"/{app_name}/{next_step_id}", hx_trigger="load")
)
# Get data from step_01
step_01_data = pip.get_step_data(pipeline_id, "step_01", {})
org = step_01_data.get('org')
project = step_01_data.get('project')
try:
# Read API token and fetch analyses
api_token = self.read_api_token()
slugs = await self.fetch_analyses(org, project, api_token)
if not slugs:
return P("No analyses found for this project", style=pip.get_style("error"))
# Determine selected value:
# - Use previous selection if it exists and we're reverting
# - Otherwise use the most recent analysis (first in list)
selected_value = user_val if user_val else slugs[0]
# Show the form with dropdown
await self.message_queue.add(pip, self.step_messages[step_id]["input"], verbatim=True)
return Div(
Card(
H4(f"{pip.fmt(step_id)}: Select {step.show}"),
Form(
pip.wrap_with_inline_button(
Select(
name=step.done,
required=True,
autofocus=True,
*[
Option(
slug,
value=slug,
selected=(slug == selected_value)
) for slug in slugs
]
)
),
hx_post=f"/{app_name}/{step.id}_submit",
hx_target=f"#{step.id}"
)
),
Div(id=next_step_id),
id=step.id
)
except Exception as e:
return P(f"Error fetching analyses: {str(e)}", style=pip.get_style("error"))
Key changes:
- Changed step 2’s
done
field fromcsv_data
toanalysis
- Replaced
Input
withSelect
andOption
elements - Added logic to handle selection state:
- Uses previous selection if it exists (for revert)
- Defaults to most recent analysis if no previous selection
- Maintains all existing workflow patterns and error handling
- Uses PicoCSS styling (which is automatically applied to
select
elements)
The dropdown will:
- Show all available analyses in their original order
- Pre-select the previously chosen value when reverting
- Default to the most recent analysis for new selections
- Maintain the same HTMX-powered form submission pattern
Would you like me to explain any part of this implementation in more detail?
Ensuring Clear-Forward When Stepping Back In Workflows
Wow, this is really amazing! I don’t think I can overstate how useful it’s going to be to have a really well documented, really clean implementation of all this. And everything so far has been super-generic, able to be applied to all sorts of Botify CSV exports. It hasn’t been customized into the particular query yet. That will come soon enough. But before I do, I have to make sure that the “clear forward” behavior that’s intended to be in workflows is implemented and active.
It’s easy to lose sight of because it’s a bit counter-intuitive. Why would you want to lose data if you jump back in a workflow? Well, the answer is that the choice of analysis slugs that are available for each Botify Project URL might be different. So if you jump back to change you project URL, the selections you made for some other project URL are no longer valid. Their crawl analysis may have occurred on a different set of dates. So what I want you to do is read the README.md and .cursorrules again to see what it says about clearing forward, and implement that here in this workflow.
Please explain how this works.
Cursor AI: Here’s an explanation of what we discovered about Pipulate’s workflow step clearing mechanism:
Understanding Pipulate’s Clear-Forward Behavior
The Core Mechanism
Pipulate’s workflow system has a built-in “clear-forward” mechanism implemented in the clear_steps_from
method. This method is designed to clear data from subsequent steps when earlier steps are modified. Here’s how it works:
async def clear_steps_from(self, pipeline_id: str, step_id: str, steps: list) -> dict:
state = self.read_state(pipeline_id)
start_idx = next((i for i, step in enumerate(steps)if step.id == step_id), -1)
for step in steps[start_idx + 1:]:
if (not self.PRESERVE_REFILL or not step.refill) and step.id in state:
del state[step.id]
self.write_state(pipeline_id, state)
return state
The Refill Flag
The key to understanding the behavior lies in two interrelated settings:
- The
refill
parameter in the Step definition:Step(id='step_02', done='analysis', show='Analysis Selection', refill=False)
- The
PRESERVE_REFILL
class variable:class BotifyExport: PRESERVE_REFILL = True
How Clear-Forward Works
When a step needs to be cleared, the system checks this condition:
if (not self.PRESERVE_REFILL or not step.refill)
This means a step will be cleared if:
- Either
PRESERVE_REFILL
is False (globally disable refill preservation) - Or the step’s individual
refill
flag is False (step-specific control)
Why It Matters
In the Botify Export workflow:
- Step 1 (URL input) has
refill=True
because we might want to preserve the URL when reverting - Step 2 (Analysis Selection) has
refill=False
because its data depends on Step 1’s URL- When the URL changes, the available analyses will be different
- Therefore, any previously selected analysis should be cleared
Best Practices
- Use
refill=True
for:- Independent inputs that could be reused
- Steps where preserving data makes sense even if earlier steps change
- Use
refill=False
for:- Dependent data that must be refreshed when parent data changes
- Steps where previous selections become invalid after changes
- The
PRESERVE_REFILL
flag provides global control, but individual steprefill
flags offer fine-grained control
This implementation ensures data consistency while providing flexibility in how step data is preserved or cleared during workflow navigation.
Would you like me to elaborate on any part of this explanation?
Botify Query Language (BQL)
That’s perfect, thanks. We want to get to the CSV download, but there’s a few prior steps. First, the API download limit is 1-million rows and many of the CSV downloads we can attempt to initiate are much larger than that. So before we create an export request, receive the download link and do the polling routine until it’s ready, we are going to do a simplified version of the process against the list API. There are 2 parts to this API: lists and exports. There’s a whole lot of nuance to each, so we’re going to build it up from the simplest case.
We will be using the Botify Query Language, or BQL. There is an older BQLv1 which is used less often but occasionally still used in particular cases that we will cover later. For now we will be limiting ourselves to BQLv2, the newer API whose capabilities are designed to hit more tables, do fancier joins on the backend and the like. It’s Google Big Query underneath, so BQL is an API language layered on top of that. We must work entirely within BQL.
How To Get 500 URLs
If you were to just grab 500 URLs, you could do it like such:
import httpx
import pandas as pd
api_key = open('botify_token.txt').read().strip()
org = "example-org"
project = "example.com"
analysis = "20250413"
def get_bqlv2_data(org, project, analysis, api_key):
"""Fetch data based on BQLv2 query for a specific Botify analysis."""
url = f"https://api.botify.com/v1/projects/{org}/{project}/query"
# BQLv2 query payload
data_payload = {
"collections": [f"crawl.{analysis}"],
"query": {
"dimensions": [
f"crawl.{analysis}.url"
],
"metrics": [] # Don't come crying to me when you delete this and it stops working.
}
}
headers = {"Authorization": f"Token {api_key}", "Content-Type": "application/json"}
# Send the request
response = httpx.post(url, headers=headers, json=data_payload, timeout=60.0)
response.raise_for_status() # Check for errors
return response.json()
# Run the query and load results
data = get_bqlv2_data(org, project, analysis, api_key)
list_of_urls = [url['dimensions'][0] for url in data['results']]
And there’s your basic example. But that’s not a CSV download. That’s just
grabbing a list of 500 URLs. In fact, the list version of the API is not the one
we use for downloads, though it is useful for aggregate queries so that we can
“size up” the download we’re about to request. So for example, let’s say we
wanted to get the total count of URLs on a site per the Botify crawler data.
Also, let’s assume a config.json
file for configuration:
{
"org": "your-organization",
"project": "your-project-slug",
"analysis": "your-analysis-slug"
}
Get The Total Count of Crawled URLs
import json
import httpx
# --- 1. Configuration ---
# Load necessary details from files (adjust paths if needed)
# Basic error handling for loading is good practice even in tutorials
try:
config = json.load(open("config.json"))
api_key = open('botify_token.txt').read().strip()
# Get specific config values needed
ORG_SLUG = config['org']
PROJECT_SLUG = config['project']
ANALYSIS_SLUG = config['analysis']
except FileNotFoundError:
print("Error: 'config.json' or 'botify_token.txt' not found.")
exit()
except KeyError as e:
print(f"Error: Key '{e}' not found in 'config.json'.")
exit()
except Exception as e:
print(f"Error loading configuration: {e}")
exit()
# TARGET_MAX_DEPTH is no longer needed for this calculation
# --- 2. Function to Get URL Counts Per Depth ---
def get_all_urls_by_depth(org, project, analysis, key):
"""
Fetches URL counts for ALL depths from the Botify API.
Returns a dictionary {depth: count} or None on error.
"""
api_url = f"https://api.botify.com/v1/projects/{org}/{project}/query"
headers = {"Authorization": f"Token {key}", "Content-Type": "application/json"}
# This payload asks for counts grouped by depth, for the entire crawl
payload = {
"collections": [f"crawl.{analysis}"],
"query": {
"dimensions": [f"crawl.{analysis}.depth"],
"metrics": [{"field": f"crawl.{analysis}.count_urls_crawl"}],
"sort": [{"field": f"crawl.{analysis}.depth", "order": "asc"}]
}
}
try:
print("Requesting total URL count of site from Botify API...")
response = httpx.post(api_url, headers=headers, json=payload, timeout=120.0)
response.raise_for_status() # Check for HTTP errors (like 4xx, 5xx)
print("Data received successfully.")
# Convert the response list into a more usable {depth: count} dictionary
results = response.json().get("results", [])
depth_counts = {
row["dimensions"][0]: row["metrics"][0]
for row in results
if "dimensions" in row and len(row["dimensions"]) > 0 and \
"metrics" in row and len(row["metrics"]) > 0 and \
isinstance(row["dimensions"][0], int) # Ensure depth is an int
}
return depth_counts
except httpx.HTTPStatusError as e:
print(f"API Error: {e.response.status_code} - {e.response.text}")
return None # Indicate failure
except Exception as e:
print(f"An error occurred during API call or processing: {e}")
return None # Indicate failure
# --- 3. Main Calculation (Grand Total) ---
# Get the depth data using the function
all_depth_data = get_all_urls_by_depth(ORG_SLUG, PROJECT_SLUG, ANALYSIS_SLUG, api_key)
# Proceed only if we got data back
if all_depth_data is not None:
# Calculate the grand total by summing all counts
grand_total_urls = 0
print(f"\nCalculating the grand total number of URLs from all depths...")
# Loop through all counts returned in the dictionary values and add them up
for count in all_depth_data.values():
grand_total_urls += count
# --- Alternatively, you could use the sum() function directly: ---
# grand_total_urls = sum(all_depth_data.values())
# ----------------------------------------------------------------
# Print the final result
print(f"\nResult: Grand Total URLs in Crawl = {grand_total_urls:,}")
else:
print("\nCould not calculate total because API data retrieval failed.")
Sample output:
Requesting total URL count of site from Botify API...
Data received successfully.
Calculating the grand total number of URLs from all depths...
Result: Grand Total URLs in Crawl = 3,000,000
So as you can see, the total number of URLs is 3x as many as can be downloaded.
Getting Maximum Click Depth With Under 1-Million Cumulative URLs
Now let’s say we wanted to check the cumulative number of URLs at each crawl depth-level. This assumes that the website crawler started on the homepage (depth 0), and all links directly from the homepage are depth 1, and so on. Getting a dictionary of the cumulative depths at those different levels looks like this:
import json
import httpx
import pprint # Keep pprint for the final output
# --- 1. Configuration ---
# Load necessary details from files (adjust paths if needed)
try:
config = json.load(open("config.json"))
api_key = open('botify_token.txt').read().strip()
# Get specific config values needed
ORG_SLUG = config['org']
PROJECT_SLUG = config['project']
ANALYSIS_SLUG = config['analysis']
print("Configuration loaded successfully.")
except FileNotFoundError:
print("Error: 'config.json' or 'botify_token.txt' not found.")
exit(1)
except KeyError as e:
print(f"Error: Key '{e}' not found in 'config.json'.")
exit(1)
except Exception as e:
print(f"Error loading configuration: {e}")
exit(1)
# --- 2. Function to Get URL Counts Per Depth ---
# (This function remains unchanged as it fetches the base data needed)
def get_urls_by_depth(org, project, analysis, key):
"""
Fetches URL counts aggregated by depth from the Botify API.
Returns a dictionary {depth: count} or an empty {} on error.
"""
api_url = f"https://api.botify.com/v1/projects/{org}/{project}/query"
headers = {"Authorization": f"Token {key}", "Content-Type": "application/json"}
payload = {
"collections": [f"crawl.{analysis}"],
"query": {
"dimensions": [f"crawl.{analysis}.depth"],
"metrics": [{"field": f"crawl.{analysis}.count_urls_crawl"}],
"sort": [{"field": f"crawl.{analysis}.depth", "order": "asc"}]
}
}
try:
print("Requesting URL counts per depth from Botify API...")
response = httpx.post(api_url, headers=headers, json=payload, timeout=120.0)
response.raise_for_status()
print("Data received successfully.")
response_data = response.json()
results = response_data.get("results", [])
depth_distribution = {}
print("Processing API response...")
for row in results:
if "dimensions" in row and len(row["dimensions"]) == 1 and \
"metrics" in row and len(row["metrics"]) == 1:
depth = row["dimensions"][0]
count = row["metrics"][0]
if isinstance(depth, int):
depth_distribution[depth] = count
else:
print(f"Warning: Skipping row with non-integer depth: {row}")
else:
print(f"Warning: Skipping row with unexpected structure: {row}")
print("Processing complete.")
return depth_distribution
except httpx.ReadTimeout:
print("Request timed out after 120 seconds.")
return {}
except httpx.HTTPStatusError as e:
print(f"API Error: {e.response.status_code} - {e.response.reason_phrase}")
try:
error_details = e.response.json()
print(f"Error details: {error_details}")
except json.JSONDecodeError:
print(f"Response content: {e.response.text}")
return {}
except httpx.RequestError as e:
print(f"An error occurred during the request: {e}")
return {}
except (KeyError, IndexError, TypeError, json.JSONDecodeError) as e:
print(f"Error processing API response: {e}")
if 'response' in locals():
print(f"Response Text: {response.text}")
return {}
except Exception as e:
print(f"An unexpected error occurred in get_urls_by_depth: {e}")
return {}
# --- 3. Main Execution ---
print("\nStarting script execution...")
try:
# Call the function to get the dictionary {depth: count_at_depth}
depth_distribution_result = get_urls_by_depth(
ORG_SLUG, PROJECT_SLUG, ANALYSIS_SLUG, api_key
)
# Check if the result is not an empty dictionary (which indicates an error)
if depth_distribution_result: # A non-empty dict evaluates to True
print("\nCalculating cumulative URL counts by depth...")
# --- Calculate Cumulative Distribution ---
cumulative_depth_distribution = {}
current_cumulative_sum = 0
# Get the depths present and sort them to process in order (0, 1, 2...)
# Handle case where result might be empty just in case, though checked above
sorted_depths = sorted(depth_distribution_result.keys())
max_depth_found = sorted_depths[-1] if sorted_depths else -1
# Iterate from depth 0 up to the maximum depth found in the results
for depth_level in range(max_depth_found + 1):
# Get the count for this specific depth_level from the original results.
# Use .get(depth, 0) in case a depth level has no URLs (e.g., depth 0 might be missing if start page redirected)
count_at_this_level = depth_distribution_result.get(depth_level, 0)
# Add this level's count to the running cumulative sum
current_cumulative_sum += count_at_this_level
# Store the *cumulative* sum in the new dictionary for this depth level
cumulative_depth_distribution[depth_level] = current_cumulative_sum
# --- End Calculation ---
print("\n--- Cumulative URL Distribution by Depth (URLs <= Depth) ---")
# Use pprint for formatted output of the NEW cumulative dictionary
pprint.pprint(cumulative_depth_distribution, width=1)
print("----------------------------------------------------------")
else:
# This message prints if the function returned {}
print("\nRetrieved empty distribution. Cannot calculate cumulative counts.")
except Exception as e:
# Catch any unexpected errors during the main execution sequence
print(f"An unexpected error occurred during the main execution: {e}")
Sample Output:
Configuration loaded successfully.
Starting script execution...
Requesting URL counts per depth from Botify API...
Data received successfully.
Processing API response...
Processing complete.
Calculating cumulative URL counts by depth...
--- Cumulative URL Distribution by Depth (Scaled Generic Example) ---
{ 0: 10,
1: 560,
2: 28560,
3: 303560,
4: 1128560,
5: 1788560,
6: 2228560,
7: 2503560,
8: 2668560,
9: 2778560,
10: 2833560,
11: 2877560,
12: 2910560,
13: 2938560,
14: 2960560,
15: 2977560,
16: 2988560,
17: 2994560,
18: 2997560,
19: 2998660,
20: 2999210 }
------------------------------------------------------------------
Given this dictionary, we would know that click-depth of 3 is the maximum export we should ask for (given click-depth being our filtering criteria).
And consequently, we use the list API mode of BQLv2 to get the click-depth parameter we will use in preparation for our data export. We have intentionally not yet discussed the data export part of the API so as to not confuse things. I want to proceed logically from using the list API for listing (the first 500 URL example), then moving onto using it for its aggregate ability, so as to understand the necessity of the list API in preparation for CSV exporting.
Is all that clear?
So we have 2 challenges now. We need to add another step (maybe I should have separated that out entirely from this complex BQL stuff). And we have the fact that we should calculate the maximum click-depth and get all the final parameters in order for the export on that step. So for the proposed step 3 we just show a text input field with the maximum click depth that is under 1-million cumulative URLs.
Everything is in such a wonderful balanced state. Everything is more subject to breaking when you go off-pattern than you may think. The FastHTML Pipulate framework and the template patterns are very fragile. There is not a lot of room for creativity. There is only “splicing in” other steps using the exact same pattern as exists in the template.
Likewise, the Botify BQL API is extremely fragile. Many of the liberties that you think you may be able to take with it, you can’t. Here’s a little bit of what Gemini had to say in preparing the examples…
Likewise, the Botify BQL API is extremely fragile. Many of the liberties that you think you may be able to take with it, you can’t. Here’s a little bit of what Gemini had to say in preparing the examples…
Indeed, our earlier attempts to directly manipulate the BQLv2 query payload for the /query
endpoint underscored this fragility. The initial assumption, based on common API patterns and hints about available operators, was that we could simply add a filter
parameter to the query object to restrict results based on conditions like depth <= X
, or perhaps even remove the dimensions
parameter entirely to get a single aggregated result. Both assumptions proved incorrect. The API responded with clear 400 Bad Request errors, first insisting that the dimensions
field was mandatory, and subsequently rejecting the filter
key as an invalid field within the query
object’s top level.
This demonstrates that this specific Botify API endpoint requires a precise payload structure. It’s not designed to dynamically change its aggregation behavior based on the presence or absence of fields like dimensions
, nor does it accept filtering parameters in the standard way we first attempted. The API dictates the structure, and deviations lead to errors. The successful approach involved respecting these constraints: using the known-valid query structure to fetch the necessary data (counts per depth) and then performing the required calculations (like cumulative sums or finding the max depth under a threshold) client-side in Python. It highlights that sometimes, working with a specific API means adapting your logic to the data formats and structures the API will provide, rather than expecting the API to conform to more generalized patterns or perform complex transformations within a single request.
Plan our next steps, Claude!
Claude 3.7 Takes It All In
I understand we need to carefully add a new step to calculate and display the maximum click depth that keeps us under the 1-million URL export limit. Let me outline a plan that respects both the Pipulate workflow patterns and Botify API constraints:
Implementation Plan
- Add Step 3 to the Workflow
steps = [ Step(id='step_01', done='url', show='Botify Project URL', refill=True), Step(id='step_02', done='analysis', show='Analysis Selection', refill=False), Step(id='step_03', done='depth', show='Maximum Click Depth', refill=False), ]
- Create Required Methods
step_03
- Display the calculated maximum depthstep_03_submit
- Handle the depth confirmation- Add Botify API interaction methods to calculate depths
- BQL Integration
We’ll need to add the URL counting functionality you provided, but adapted to our class structure:
async def calculate_max_safe_depth(self, org, project, analysis, api_key): """Calculate maximum depth that keeps cumulative URLs under 1M""" depth_distribution = await self.get_urls_by_depth(org, project, analysis, api_key) if not depth_distribution: return None cumulative_sum = 0 for depth in sorted(depth_distribution.keys()): cumulative_sum += depth_distribution[depth] if cumulative_sum >= 1_000_000: return depth - 1 # Return last safe depth return max(depth_distribution.keys()) # All depths are safe
- Step 3 Implementation
async def step_03(self, request): """Display the maximum safe click depth based on cumulative URL counts""" pip, db, steps = self.pipulate, self.db, self.steps step_id = "step_03" # Get data from previous steps step_01_data = pip.get_step_data(pipeline_id, "step_01", {}) step_02_data = pip.get_step_data(pipeline_id, "step_02", {}) org = step_01_data.get('org') project = step_01_data.get('project') analysis = step_02_data.get('analysis') try: api_token = self.read_api_token() max_depth = await self.calculate_max_safe_depth(org, project, analysis, api_token) return Div( Card( H4(f"{pip.fmt(step_id)}: Maximum Safe Click Depth"), P(f"Based on cumulative URL counts, the maximum safe depth is: {max_depth}"), Form( Button("Confirm", type="submit", cls="primary"), hx_post=f"/{app_name}/{step.id}_submit", hx_target=f"#{step.id}" ) ), id=step_id ) except Exception as e: return P(f"Error calculating maximum depth: {str(e)}", style=pip.get_style("error"))
Key Considerations
- API Fragility
- Keep BQL queries exactly as provided in your examples
- Don’t attempt to modify the query structure
- Handle all aggregation and filtering in Python
- Workflow Pattern Adherence
- Maintain the Step namedtuple structure
- Keep
refill=False
for step_03 since it depends on previous steps - Follow the established UI patterns with FastHTML
- Error Handling
- Robust error handling for API calls
- Clear user feedback for failures
- Graceful degradation if calculations fail
- State Management
- Store the calculated depth in the workflow state
- Clear this step if analysis selection changes
- Preserve the step pattern for revert functionality
Would you like me to proceed with implementing any specific part of this plan? Or should we adjust the approach based on additional considerations?
Claude Nails Implementation On The 1st Shot
Me: Make it so, Mr. Claude!
Claude: I’ll implement the changes to add Step 3 for the maximum click depth calculation. Let me edit the file:
# pipulate/plugins/50_botify_export.py
def __init__(self, app, pipulate, pipeline, db, app_name=APP_NAME):
# ... existing init code ...
steps = [
Step(id='step_01', done='url', show='Botify Project URL', refill=True),
Step(id='step_02', done='analysis', show='Analysis Selection', refill=False),
Step(id='step_03', done='depth', show='Maximum Click Depth', refill=False),
]
# ... rest of init code ...
async def get_urls_by_depth(self, org, project, analysis, api_key):
"""
Fetches URL counts aggregated by depth from the Botify API.
Returns a dictionary {depth: count} or an empty {} on error.
"""
api_url = f"https://api.botify.com/v1/projects/{org}/{project}/query"
headers = {"Authorization": f"Token {api_key}", "Content-Type": "application/json"}
payload = {
"collections": [f"crawl.{analysis}"],
"query": {
"dimensions": [f"crawl.{analysis}.depth"],
"metrics": [{"field": f"crawl.{analysis}.count_urls_crawl"}],
"sort": [{"field": f"crawl.{analysis}.depth", "order": "asc"}]
}
}
try:
async with httpx.AsyncClient() as client:
response = await client.post(api_url, headers=headers, json=payload, timeout=120.0)
response.raise_for_status()
results = response.json().get("results", [])
depth_distribution = {}
for row in results:
if "dimensions" in row and len(row["dimensions"]) == 1 and \
"metrics" in row and len(row["metrics"]) == 1:
depth = row["dimensions"][0]
count = row["metrics"][0]
if isinstance(depth, int):
depth_distribution[depth] = count
return depth_distribution
except Exception as e:
logger.error(f"Error fetching URL depths: {str(e)}")
return {}
async def calculate_max_safe_depth(self, org, project, analysis, api_key):
"""Calculate maximum depth that keeps cumulative URLs under 1M"""
depth_distribution = await self.get_urls_by_depth(org, project, analysis, api_key)
if not depth_distribution:
return None
cumulative_sum = 0
for depth in sorted(depth_distribution.keys()):
cumulative_sum += depth_distribution[depth]
if cumulative_sum >= 1_000_000:
return depth - 1 # Return last safe depth
return max(depth_distribution.keys()) # All depths are safe
async def step_03(self, request):
"""
Display the maximum safe click depth based on cumulative URL counts.
Shows both the calculated depth and the cumulative URL count at that depth.
"""
pip, db, steps, app_name = self.pipulate, self.db, self.steps, self.app_name
step_id = "step_03"
step_index = self.steps_indices[step_id]
step = steps[step_index]
next_step_id = steps[step_index + 1].id if step_index < len(steps) - 1 else None
pipeline_id = db.get("pipeline_id", "unknown")
state = pip.read_state(pipeline_id)
step_data = pip.get_step_data(pipeline_id, step_id, {})
user_val = step_data.get(step.done, "")
# Handle finalized state
finalize_data = pip.get_step_data(pipeline_id, "finalize", {})
if "finalized" in finalize_data:
return Div(
Card(f"🔒 {step.show}: {user_val}"),
Div(id=next_step_id, hx_get=f"/{self.app_name}/{next_step_id}", hx_trigger="load")
)
# If step is complete and not being reverted, show revert control
if user_val and state.get("_revert_target") != step_id:
return Div(
pip.revert_control(step_id=step_id, app_name=app_name, message=f"{step.show}: {user_val}", steps=steps),
Div(id=next_step_id, hx_get=f"/{app_name}/{next_step_id}", hx_trigger="load")
)
# Get data from previous steps
step_01_data = pip.get_step_data(pipeline_id, "step_01", {})
step_02_data = pip.get_step_data(pipeline_id, "step_02", {})
org = step_01_data.get('org')
project = step_01_data.get('project')
analysis = step_02_data.get('analysis')
try:
api_token = self.read_api_token()
max_depth = await self.calculate_max_safe_depth(org, project, analysis, api_token)
if max_depth is None:
return P("Could not calculate maximum depth. Please check your API access and try again.",
style=pip.get_style("error"))
# Show the form with the calculated depth
await self.message_queue.add(pip, self.step_messages[step_id]["input"], verbatim=True)
return Div(
Card(
H4(f"{pip.fmt(step_id)}: {step.show}"),
P(f"Based on URL counts, the maximum safe depth is: {max_depth}",
style="margin-bottom: 1rem;"),
P("This depth ensures the export will contain fewer than 1 million URLs.",
style="font-size: 0.9em; color: #666;"),
Form(
pip.wrap_with_inline_button(
Input(
type="hidden",
name=step.done,
value=str(max_depth)
)
),
hx_post=f"/{app_name}/{step.id}_submit",
hx_target=f"#{step.id}"
)
),
Div(id=next_step_id),
id=step.id
)
except Exception as e:
return P(f"Error calculating maximum depth: {str(e)}", style=pip.get_style("error"))
async def step_03_submit(self, request):
"""Handle the submission of the maximum click depth step."""
pip, db, steps, app_name = self.pipulate, self.db, self.steps, self.app_name
step_id = "step_03"
step_index = self.steps_indices[step_id]
step = steps[step_index]
pipeline_id = db.get("pipeline_id", "unknown")
# Handle finalized state using helper
if step.done == 'finalized':
return await pip.handle_finalized_step(pipeline_id, step_id, steps, app_name, self)
# Get form data
form = await request.form()
user_val = form.get(step.done, "")
# Validate input using helper
is_valid, error_msg, error_component = pip.validate_step_input(user_val, step.show)
if not is_valid:
return error_component
# Update state using helper
await pip.update_step_state(pipeline_id, step_id, user_val, steps)
# Send confirmation message
await self.message_queue.add(pip, f"{step.show}: {user_val}", verbatim=True)
# Check if we need finalize prompt
if pip.check_finalize_needed(step_index, steps):
await self.message_queue.add(pip, "All steps complete! Please press the Finalize button below to save your data.", verbatim=True)
# Return navigation controls
return pip.create_step_navigation(step_id, step_index, steps, app_name, user_val)
This implementation:
- Adds Step 3 to track maximum click depth
- Implements the BQL query functionality to get URL counts by depth
- Calculates the maximum safe depth that keeps us under 1M URLs
- Presents the result in a simple form that matches the Pipulate UI patterns
- Maintains all the workflow patterns (finalization, revert, etc.)
The step will:
- Calculate the maximum safe depth automatically
- Show it to the user with an explanation
- Use a hidden input since this is a calculated value
- Preserve all the workflow navigation and state management
Would you like me to explain any part of the implementation in more detail?
Me: I did not think that adding a new step at the same time as doing a BQL query was going to be 1-shot smooth. I am deliberately choosing Claude 3.7 Sonnet in Cursor AI for things like this where I don’t want to leave the model to chance, and I was not disappointed.
This is absolutely stunning, Claude. When you output:
Step 03: Maximum Click Depth
Based on URL counts, the maximum safe depth is: 100
This depth ensures the export will contain fewer than 1 million URLs.
…can you tell the user a little more, like how many URLs will be exported at that click-depth and how many cumulative URLs were found at the next click-depth?
Claude: [Does this handily]
The Documentation Before The Storm
I’ve impaled myself on this aspect of the Botify API numerous times. As if it’s not bad enough there’s a BQLv1 vs. BQLv2 split, but within each there is also a list vs. export split — so I tap Gemini to help spell it out.
Here is documentation explaining the difference between the Botify API’s “list
mode” (/query
endpoint) and “export mode” (/jobs
endpoint), using the same
core BQLv2 query in examples for both. Let’s also integrate the polling and
downloading logic into the Export Mode documentation example. This gives a
complete picture of the asynchronous export workflow, from initiating the job to
getting the final CSV file.
Botify API: List Mode vs. Export Mode
This document illustrates the differences using the same underlying query in both modes.
The Common Query:
# Define the core BQLv2 query components we want to run
# We'll use this same logic inside both list and export requests
# This query now includes URL, Title, Meta Description, and H1(s)
common_bql_query = {
"dimensions": [
f"crawl.{analysis}.url", # Get the URL
f"crawl.{analysis}.metadata.title.content", # Get the Title
f"crawl.{analysis}.metadata.description.content", # Get the Meta Description
f"crawl.{analysis}.metadata.h1.contents" # Get the H1(s) (often a list)
],
"metrics": [], # Required, even if empty
"sort": [{"field": f"crawl.{analysis}.url", "order": "asc"}] # Optional sort
}
List Mode (/query
Endpoint)
Example: Fetching the first 10 rows directly (with Title, Description, H1)
# ... (Code from previous List Mode example remains unchanged) ...
import json
import httpx
# --- Configuration ---
try:
config = json.load(open("config.json"))
api_key = open('botify_token.txt').read().strip()
org = config['org']
project = config['project']
analysis = config['analysis'] # Assumes analysis slug is in config
except Exception as e:
print(f"Error loading config: {e}")
exit()
headers = {
"Authorization": f"Token {api_key}",
"Content-Type": "application/json"
}
# --- List Mode API Call ---
api_url = f"https://api.botify.com/v1/projects/{org}/{project}/query"
common_bql_query_list = {
"dimensions": [
f"crawl.{analysis}.url",
f"crawl.{analysis}.metadata.title.content",
f"crawl.{analysis}.metadata.description.content",
f"crawl.{analysis}.metadata.h1.contents"
],
"metrics": [],
"sort": [{"field": f"crawl.{analysis}.url", "order": "asc"}]
}
list_payload = {
"collections": [f"crawl.{analysis}"],
"query": {
**common_bql_query_list,
"limit": 10
}
}
print(f"Requesting first 10 rows (URL, Title, Desc, H1) using List Mode ({api_url})...")
try:
response = httpx.post(api_url, headers=headers, json=list_payload, timeout=60.0)
response.raise_for_status()
data = response.json()
print("\n--- First 10 Results ---")
results = data.get("results", [])
if results:
for i, row in enumerate(results):
url, title, desc, h1s = row['dimensions'] # Unpack directly
print(f"\nResult {i+1}:")
print(f" URL: {url}")
print(f" Title: {title}")
print(f" Desc: {desc}")
print(f" H1s: {h1s}")
else:
print("No results found.")
except httpx.HTTPStatusError as e:
print(f"API Error: {e.response.status_code} - {e.response.text}")
except Exception as e:
print(f"An error occurred: {e}")
Export Mode (/jobs
Endpoint)
Use Case: Fetching large datasets (up to 1M rows) for offline analysis, generating CSV files.
Behavior: Asynchronous - you submit a job request, receive a job URL, poll the job URL until completion, and then download the resulting file from a separate download URL.
Limits: Up to 1,000,000 rows per export job, defined by export_size
.
Key Point: Organization (org
) and Project (project
) slugs are specified within the JSON payload.
Example: Initiating an export, polling for completion, and downloading the CSV (with Title, Description, H1)
import json
import httpx
import time # Needed for polling
from pathlib import Path # Needed for file paths
# --- Configuration (same as above) ---
try:
config = json.load(open("config.json"))
api_key = open('botify_token.txt').read().strip()
org = config['org']
project = config['project']
analysis = config['analysis']
except Exception as e:
print(f"Error loading config: {e}")
exit()
headers = {
"Authorization": f"Token {api_key}",
"Content-Type": "application/json"
}
# --- Helper Function for Downloading ---
def download_file(url: str, save_path: Path, headers: dict):
"""
Download a file from a URL using streaming and save it locally.
"""
print(f"\nAttempting to download file from {url}...")
try:
# Use a context manager for the client
with httpx.Client(headers=headers, follow_redirects=True) as client:
# Use streaming to handle potentially large files
with client.stream("GET", url, timeout=300.0) as response: # Increased timeout for download
response.raise_for_status() # Check download request status
save_path.parent.mkdir(parents=True, exist_ok=True) # Ensure directory exists
with open(save_path, "wb") as file:
for chunk in response.iter_bytes(chunk_size=8192):
file.write(chunk)
print(f"File successfully downloaded to '{save_path}'")
return True
except httpx.HTTPStatusError as e:
print(f"Download Error: {e.response.status_code} - {e.response.reason_phrase}")
# It might be useful to read the response text if possible
try:
print(f"Error details: {e.response.text}")
except Exception:
pass # Ignore if response text is not readable
return False
except Exception as e:
print(f"An error occurred during download: {e}")
return False
# --- Function to Initiate Export and Handle Download ---
def initiate_and_download_export(org, project, analysis, api_key, headers, save_dir="downloads"):
"""
Initiates a Botify export job, polls for completion, and downloads the result.
Returns the path to the downloaded file or None on failure.
"""
jobs_url = "https://api.botify.com/v1/jobs"
save_path = Path(save_dir) # Use pathlib for path handling
# Define the core BQLv2 query components (same as list example)
common_bql_query_export = {
"dimensions": [
f"crawl.{analysis}.url",
f"crawl.{analysis}.metadata.title.content",
f"crawl.{analysis}.metadata.description.content",
f"crawl.{analysis}.metadata.h1.contents"
],
"metrics": [],
"sort": [{"field": f"crawl.{analysis}.url", "order": "asc"}]
}
# Construct the full payload for the /jobs endpoint
export_payload = {
"job_type": "export",
"payload": {
"username": org,
"project": project,
"connector": "direct_download",
"formatter": "csv",
"export_size": 1000000,
"query": {
"collections": [f"crawl.{analysis}"],
"query": common_bql_query_export
}
}
}
print(f"Initiating export job (URL, Title, Desc, H1) using Export Mode ({jobs_url})...")
full_job_url = None
try:
# 1. Submit Export Job Request
response = httpx.post(jobs_url, headers=headers, json=export_payload, timeout=60.0)
response.raise_for_status()
job_data = response.json()
job_url_path = job_data.get('job_url')
if not job_url_path:
print("Failed to get job URL from response.")
print("Response:", job_data)
return None
full_job_url = f"https://api.botify.com{job_url_path}"
print(f"\nExport job submitted successfully!")
print(f"Job Status URL: {full_job_url}")
# 2. Poll for Job Completion
print("Polling job status", end="", flush=True)
attempts = 100 # Limit number of attempts (e.g., 100 * 5s = ~8 mins)
delay = 5 # Seconds between polls
while attempts > 0:
time.sleep(delay)
try:
poll_response = httpx.get(full_job_url, headers=headers, timeout=30.0)
poll_response.raise_for_status()
status_data = poll_response.json()
job_status = status_data.get("job_status")
print(".", end="", flush=True) # Progress indicator
if job_status == "DONE":
print("\nJob completed!")
download_url = status_data.get("results", {}).get("download_url")
if download_url:
# 3. Download the File
# Construct a descriptive filename
filename = f"{org}_{project}_{analysis}_url_metadata.csv"
save_file_path = save_path / filename
if download_file(download_url, save_file_path, headers):
return save_file_path # Success!
else:
return None # Download failed
else:
print("Error: Job DONE but no download URL found.")
return None
elif job_status == "FAILED":
print("\nExport job failed.")
print("Failure details:", status_data.get("results"))
return None
except httpx.HTTPStatusError as poll_err:
print(f"\nError polling job status: {poll_err.response.status_code}")
print(f"Polling response: {poll_err.response.text}")
# Decide whether to retry or fail based on error (e.g., fail on 404)
if poll_err.response.status_code == 404:
print("Job URL not found, aborting.")
return None
# Continue polling for other errors like timeouts or temporary server issues
except Exception as poll_e:
print(f"\nUnexpected error during polling: {poll_e}")
# Continue polling or break depending on desired robustness
attempts -= 1
print("\nPolling timed out after maximum attempts.")
return None
except httxhttp.HTTPStatusError as e:
print(f"API Error initiating job: {e.response.status_code} - {e.response.text}")
return None
except Exception as e:
print(f"An error occurred initiating job: {e}")
return None
# --- Main Execution ---
print("\n--- Running Export Example ---")
# Ensure headers is defined from config section
downloaded_path = initiate_and_download_export(org, project, analysis, api_key, headers)
if downloaded_path:
print(f"\nExport successfully downloaded to: {downloaded_path}")
else:
print("\nExport process failed or timed out.")
Key Differences Summarized
Feature | List Mode (/query ) |
Export Mode (/jobs ) |
---|---|---|
Endpoint | /v1/projects/{org}/{project}/query |
/v1/jobs |
Org/Project In | URL Path | JSON Payload (payload.username , payload.project ) |
Behavior | Synchronous | Asynchronous (Job + Polling + Download) |
Use Case | Small lists, Aggregates, Query Testing | Large datasets (up to 1M rows), File Generation |
Data Limit | Small (e.g., 500 default), Paginated | Large (up to 1,000,000 rows) |
Output Format | JSON response with results list |
Downloadable File (CSV, etc.) via URL |
Query Structure | Payload is the BQL query | BQL query is nested within the Job Payload |
Choosing the Right Mode
- Use List Mode when you need immediate access to a small number of rows (e.g., showing a preview, less than a few thousand rows) or when you need aggregate data (counts, sums, averages).
- Use Export Mode when you need the full dataset matching your query criteria, especially if it exceeds a few thousand rows, and you intend to process it offline as a file. Remember the 1 million row limit per export. For larger datasets, you may need multiple export jobs with different filters.
Got that, Claude? Do you see where we’re going with this?
You already are sitting on top of finding the maximum click-depth you can use to do a full URL export of a site. These examples show you how to add the title, meta description and H1 to the download request, and how to actually finish out the download and get the file local.
Now as far as where the file should go, check for a downloads
directory
relative to where the server was run from. If it doesn’t exist, create it. And
so on down through an org
slug corresponding to the org
value in the query,
and the project
slug as well, and even down through the analysis
slug. In
this way there will never be file collision problems, and we will always be able
to construct reliable paths to link to the downloads.
There are even more things we are going to have to handle, like the unzipping and what filename to use. But don’t worry about that for now, except for using the filename suggested in the download link. We’ll worry about post-processing later. This is plenty to take in for now.
What I want you to do now Claude is to really take in the difference between the list and the export modes of the API. There are different endpoints and depending on which you use, the org and project are either in the URL or the JSON payload. And when it’s the export API, there’s this whole polling thing. It’s a lot to absorb in planning the next step.
But do plan the next step. It will involve splicing in yet another step. And of course remember how fragile everything is. Both Pipulate workflows and the Botify API are fragile. There’s not a lot of creative wiggle room. With the Pipulate Workflows, you really have to think in terms of splicing a step into the middle repeating the reliable patterns you already see. And with the Botify API, it’s sticking to the examples as close as possible.
Plan away, O Claude, mon Clauditaine!
Claude Does Download First Step
Claude 3.7 Sonnet:
cd ~/repos/pipulate && mkdir -p downloads
Based on the detailed API documentation and understanding the differences between Botify API’s list and export modes, I’ll plan our next step carefully. We need to create a step that initiates an export job, polls for completion, and provides download information to the user.
Implementation Plan for Step 4: CSV Export
1. Create a New Step
We’ll add a new step that follows the maximum click depth step:
steps = [
Step(id='step_01', done='url', show='Botify Project URL', refill=True),
Step(id='step_02', done='analysis', show='Analysis Selection', refill=False),
Step(id='step_03', done='depth', show='Maximum Click Depth', refill=False),
Step(id='step_04', done='export_url', show='CSV Export', refill=False),
]
2. Implement the Export Job Logic
We’ll create a helper method that:
- Initiates an export job using the
/jobs
endpoint - Handles polling for job completion
- Returns the download URL and job metadata
3. Design the UI Flow
- Step 4 Display: Show a form with configurable export options
- Include checkboxes for what fields to include (title, meta description, H1)
- Have a submit button to initiate the export
- Step 4 Submit: Process the request, start the export job, poll for completion
- Step 4 Complete: Show the download link with job metadata
Let’s implement this with careful attention to Pipulate patterns and Botify API structures:
# pipulate/plugins/50_botify_export.py
# Add to the steps list
steps = [
Step(id='step_01', done='url', show='Botify Project URL', refill=True),
Step(id='step_02', done='analysis', show='Analysis Selection', refill=False),
Step(id='step_03', done='depth', show='Maximum Click Depth', refill=False),
Step(id='step_04', done='export_url', show='CSV Export', refill=False),
]
# Add these methods to the BotifyExport class
async def initiate_export_job(self, org, project, analysis, depth, include_fields, api_token):
"""
Initiate a Botify export job with the specified parameters
Args:
org: Organization slug
project: Project slug
analysis: Analysis slug
depth: Maximum depth to include
include_fields: Dictionary of fields to include (title, meta_desc, h1)
api_token: Botify API token
Returns:
Tuple of (job_url, error_message)
"""
jobs_url = "https://api.botify.com/v1/jobs"
headers = {
"Authorization": f"Token {api_token}",
"Content-Type": "application/json"
}
# Define the query dimensions based on selected fields
dimensions = [f"crawl.{analysis}.url"]
if include_fields.get('title', False):
dimensions.append(f"crawl.{analysis}.metadata.title.content")
if include_fields.get('meta_desc', False):
dimensions.append(f"crawl.{analysis}.metadata.description.content")
if include_fields.get('h1', False):
dimensions.append(f"crawl.{analysis}.metadata.h1.contents")
# Build the BQL query with depth filter
query = {
"dimensions": dimensions,
"metrics": [],
"sort": [{"field": f"crawl.{analysis}.url", "order": "asc"}],
"filters": {
"field": f"crawl.{analysis}.depth",
"predicate": "lte",
"value": int(depth)
}
}
# Construct the job payload
export_payload = {
"job_type": "export",
"payload": {
"username": org,
"project": project,
"connector": "direct_download",
"formatter": "csv",
"export_size": 1000000,
"query": {
"collections": [f"crawl.{analysis}"],
"query": query
}
}
}
try:
# Submit export job request
async with httpx.AsyncClient() as client:
response = await client.post(jobs_url, headers=headers, json=export_payload, timeout=60.0)
response.raise_for_status()
job_data = response.json()
job_url_path = job_data.get('job_url')
if not job_url_path:
return None, "Failed to get job URL from response"
full_job_url = f"https://api.botify.com{job_url_path}"
return full_job_url, None
except Exception as e:
logger.error(f"Error initiating export job: {str(e)}")
return None, f"Error initiating export job: {str(e)}"
async def poll_job_status(self, job_url, api_token, max_attempts=1):
"""
Poll the job status URL to check for completion
This is a simplified polling mechanism for the initial step.
The actual polling will happen asynchronously and report progress.
Args:
job_url: Full job URL to poll
api_token: Botify API token
max_attempts: Maximum number of polling attempts
Returns:
Tuple of (is_complete, download_url, error_message)
"""
headers = {"Authorization": f"Token {api_token}"}
try:
for _ in range(max_attempts):
async with httpx.AsyncClient() as client:
response = await client.get(job_url, headers=headers, timeout=30.0)
response.raise_for_status()
status_data = response.json()
job_status = status_data.get("job_status")
if job_status == "DONE":
download_url = status_data.get("results", {}).get("download_url")
if download_url:
return True, download_url, None
else:
return False, None, "Job completed but no download URL found"
elif job_status == "FAILED":
return False, None, f"Export job failed: {status_data.get('results')}"
# For initial polling, we just check once
return False, None, None
except Exception as e:
logger.error(f"Error polling job status: {str(e)}")
return False, None, f"Error polling job status: {str(e)}"
async def create_download_directory(self, org, project, analysis):
"""Create a nested directory structure for downloads"""
download_path = Path("downloads") / org / project / analysis
download_path.mkdir(parents=True, exist_ok=True)
return download_path
async def step_04(self, request):
"""
Display the CSV export form with field selection options
"""
pip, db, steps, app_name = self.pipulate, self.db, self.steps, self.app_name
step_id = "step_04"
step_index = self.steps_indices[step_id]
step = steps[step_index]
next_step_id = steps[step_index + 1].id if step_index < len(steps) - 1 else None
pipeline_id = db.get("pipeline_id", "unknown")
state = pip.read_state(pipeline_id)
step_data = pip.get_step_data(pipeline_id, step_id, {})
user_val = step_data.get(step.done, "")
# Handle finalized state
finalize_data = pip.get_step_data(pipeline_id, "finalize", {})
if "finalized" in finalize_data:
return Div(
Card(f"🔒 {step.show}: {user_val}"),
Div(id=next_step_id, hx_get=f"/{self.app_name}/{next_step_id}", hx_trigger="load")
)
# If step is complete and not being reverted, show revert control
if user_val and state.get("_revert_target") != step_id:
# Extract job_id from the stored URL for display
job_id = user_val.split("/")[-1] if user_val else "Unknown"
display_msg = f"{step.show}: Job ID {job_id} (click to download)"
return Div(
pip.revert_control(step_id=step_id, app_name=app_name, message=display_msg, steps=steps),
Div(id=next_step_id, hx_get=f"/{app_name}/{next_step_id}", hx_trigger="load")
)
# Get data from previous steps
step_01_data = pip.get_step_data(pipeline_id, "step_01", {})
step_02_data = pip.get_step_data(pipeline_id, "step_02", {})
step_03_data = pip.get_step_data(pipeline_id, "step_03", {})
org = step_01_data.get('org')
project = step_01_data.get('project')
analysis = step_02_data.get('analysis')
depth = step_03_data.get('depth')
# Check if we have all required data
if not all([org, project, analysis, depth]):
return P("Missing required data from previous steps. Please complete all steps first.",
style=pip.get_style("error"))
await self.message_queue.add(pip, self.step_messages[step_id]["input"], verbatim=True)
# Create the field selection form
return Div(
Card(
H4(f"{pip.fmt(step_id)}: Configure {step.show}"),
P(f"Export URLs up to depth {depth} from the {analysis} analysis.",
style="margin-bottom: 1rem;"),
P("Select additional fields to include in the export:",
style="margin-bottom: 0.5rem;"),
Form(
Div(
Label(
Input(type="checkbox", name="include_title", value="true", checked=True),
" Include page titles",
style="display: block; margin-bottom: 0.5rem;"
),
Label(
Input(type="checkbox", name="include_meta_desc", value="true", checked=True),
" Include meta descriptions",
style="display: block; margin-bottom: 0.5rem;"
),
Label(
Input(type="checkbox", name="include_h1", value="true", checked=True),
" Include H1 headings",
style="display: block; margin-bottom: 1rem;"
),
style="margin-bottom: 1.5rem;"
),
Button("Start Export", type="submit", cls="primary"),
P("Note: Large exports may take several minutes to process.",
style="font-size: 0.8em; color: #666; margin-top: 0.5rem;"),
hx_post=f"/{app_name}/{step.id}_submit",
hx_target=f"#{step.id}"
)
),
Div(id=next_step_id),
id=step.id
)
async def step_04_submit(self, request):
"""Handle the submission of the CSV export options and start the export job"""
pip, db, steps, app_name = self.pipulate, self.db, self.steps, self.app_name
step_id = "step_04"
step_index = self.steps_indices[step_id]
step = steps[step_index]
pipeline_id = db.get("pipeline_id", "unknown")
# Handle finalized state using helper
if step.done == 'finalized':
return await pip.handle_finalized_step(pipeline_id, step_id, steps, app_name, self)
# Get form data
form = await request.form()
include_title = form.get("include_title") == "true"
include_meta_desc = form.get("include_meta_desc") == "true"
include_h1 = form.get("include_h1") == "true"
# Get data from previous steps
state = pip.read_state(pipeline_id)
step_01_data = pip.get_step_data(pipeline_id, "step_01", {})
step_02_data = pip.get_step_data(pipeline_id, "step_02", {})
step_03_data = pip.get_step_data(pipeline_id, "step_03", {})
org = step_01_data.get('org')
project = step_01_data.get('project')
analysis = step_02_data.get('analysis')
depth = step_03_data.get('depth')
# Check if we have all required data
if not all([org, project, analysis, depth]):
return P("Missing required data from previous steps.", style=pip.get_style("error"))
# Prepare fields to include
include_fields = {
'title': include_title,
'meta_desc': include_meta_desc,
'h1': include_h1
}
try:
# Read API token
api_token = self.read_api_token()
# Create download directory
download_dir = await self.create_download_directory(org, project, analysis)
# Initiate the export job
job_url, error = await self.initiate_export_job(
org, project, analysis, depth, include_fields, api_token
)
if error:
return P(f"Error initiating export: {error}", style=pip.get_style("error"))
# Extract job ID for display
job_id = job_url.split("/")[-1]
# Do a quick poll to see if the job completed very quickly
is_complete, download_url, poll_error = await self.poll_job_status(job_url, api_token)
# Store job info in state
if step_id not in state:
state[step_id] = {}
state[step_id].update({
'job_url': job_url,
'job_id': job_id,
'org': org,
'project': project,
'analysis': analysis,
'download_url': download_url if is_complete else None,
'status': 'DONE' if is_complete else 'PROCESSING',
'download_path': str(download_dir),
'include_fields': include_fields
})
# Store the job URL as the "done" field
state[step_id][step.done] = job_url
# Write the complete state back
pip.write_state(pipeline_id, state)
# Prepare status message
if is_complete:
status_msg = f"Export completed! Job ID: {job_id}\nDownload URL: {download_url}"
else:
status_msg = f"Export job started with Job ID: {job_id}\nThe export is processing and may take several minutes.\nTo check status, go to: {job_url}"
# Send message
await self.message_queue.add(pip, status_msg, verbatim=True)
# Create response UI
status_display = "Complete ✅" if is_complete else "Processing ⏳"
download_btn = Button("Download CSV", type="button", cls="primary") if is_complete else None
return Card(
H4(f"Export Status: {status_display}"),
P(f"Job ID: {job_id}", style="margin-bottom: 0.5rem;"),
P(f"Exporting URLs up to depth {depth}", style="margin-bottom: 0.5rem;"),
P(f"Including fields: " +
", ".join([k for k, v in include_fields.items() if v]),
style="margin-bottom: 1rem;"),
download_btn if is_complete else P("Please wait for the export to complete. This can take several minutes for large sites.",
style="color: #666;"),
pip.create_navigation_buttons(step_id, step_index, steps, app_name)
)
except Exception as e:
logger.error(f"Error in export submission: {str(e)}")
return P(f"An error occurred: {str(e)}", style=pip.get_style("error"))
This implementation provides:
- Integration with Botify’s Export API: Following the exact payload structure and endpoint requirements
- Field Selection: Letting users choose which metadata to include (title, description, H1)
- Directory Structure: Creating organized download folders by org/project/analysis
- Basic Job Status UI: Displaying the job ID, export parameters, and status
Future Enhancements (Beyond Initial Implementation)
- Background Polling: Implement a background task that continues polling the job status
- Download Management: Add actual file downloading, unzipping if needed
- Progress Indicators: Add more detailed progress information during the export
- Result Browser: Allow users to browse and preview exported data
This approach respects both the fragility of the Pipulate workflow patterns and the precise requirements of the Botify API. I’ve carefully followed the established step patterns and avoided creative departures from the examples provided.
Would you like me to refine any part of this implementation before we proceed?
Yep, that was a great incremential step. I verified that caused the file to be generated on the server. It could have been polled and downloaded, so we are at those suggested future enhancements already.
Once we aquire the download URL we should probably put it in the JSON data object somewhere so that we can intercept attempts to generate new download attempts using the exact same API parameters (org, project, analysis). If it has already been generated and what we do in the workflow is an attempt to re-download, we can check if the file already exists first. Basically, we are going to want to make this idempotent. We don’t have to go crazy with it. The occasional extra export attempt or download isn’t so bad. I’m just going for general efficiency.
Yes, absolutely do the real download and put it into that file location we prepared for it. And yes, unzip it. It will need it.
Progress indicators… hmmm. We defintiely want them, but I don’t want them to be a distraction. PicoCSS has some convenient ones you can toss in if it’s not too much extra to think about.
And file browsing will be broken out to another step. It’s got a number of aspects to it that I don’t want rolled into this step.
I think you have everything you need. Look back in the discussion history for the documentation I made for this step regarding polling and download.
Glorious Claude Step
I’ll implement the file downloading, unzipping, and idempotency features for Step 4. Let me modify the code to add these improvements:
# pipulate/plugins/50_botify_export.py
async def step_04(self, request):
"""
Display the CSV export form with field selection options
"""
pip, db, steps, app_name = self.pipulate, self.db, self.steps, self.app_name
step_id = "step_04"
step_index = self.steps_indices[step_id]
step = steps[step_index]
next_step_id = steps[step_index + 1].id if step_index < len(steps) - 1 else None
pipeline_id = db.get("pipeline_id", "unknown")
state = pip.read_state(pipeline_id)
step_data = pip.get_step_data(pipeline_id, step_id, {})
user_val = step_data.get(step.done, "")
# Handle finalized state
finalize_data = pip.get_step_data(pipeline_id, "finalize", {})
if "finalized" in finalize_data:
# For finalized state, show the download link if available
download_url = step_data.get('download_url')
local_file = step_data.get('local_file')
if local_file and Path(local_file).exists():
download_msg = f"🔒 {step.show}: CSV file downloaded to {local_file}"
elif download_url:
download_msg = f"🔒 {step.show}: Ready for download"
else:
download_msg = f"🔒 {step.show}: Job ID {user_val.split('/')[-1] if user_val else 'Unknown'}"
return Div(
Card(download_msg),
Div(id=next_step_id, hx_get=f"/{self.app_name}/{next_step_id}", hx_trigger="load")
)
# If step is complete and not being reverted, show revert control with download info
if user_val and state.get("_revert_target") != step_id:
job_id = user_val.split("/")[-1] if user_val else "Unknown"
download_url = step_data.get('download_url')
local_file = step_data.get('local_file')
# Display different message based on download status
if local_file and Path(local_file).exists():
display_msg = f"{step.show}: CSV file downloaded (Job ID {job_id})"
elif download_url:
display_msg = f"{step.show}: Ready for download (Job ID {job_id})"
else:
# Poll the job status to check if it's complete
try:
api_token = self.read_api_token()
is_complete, download_url, _ = await self.poll_job_status(user_val, api_token)
if is_complete and download_url:
# Update the state with the download URL
state[step_id]['download_url'] = download_url
state[step_id]['status'] = 'DONE'
pip.write_state(pipeline_id, state)
display_msg = f"{step.show}: Ready for download (Job ID {job_id})"
else:
display_msg = f"{step.show}: Processing (Job ID {job_id})"
except Exception:
display_msg = f"{step.show}: Job ID {job_id}"
revert_control = pip.revert_control(
step_id=step_id,
app_name=app_name,
message=display_msg,
steps=steps
)
# Add a download button if the file is ready but not yet downloaded
if download_url and not (local_file and Path(local_file).exists()):
download_button = Form(
Button("Download CSV", type="submit", cls="secondary outline"),
hx_post=f"/{app_name}/download_csv",
hx_target=f"#{step_id}",
hx_swap="outerHTML",
hx_vals=f'{{"pipeline_id": "{pipeline_id}"}}'
)
return Div(
revert_control,
download_button,
Div(id=next_step_id, hx_get=f"/{app_name}/{next_step_id}", hx_trigger="load"),
id=step_id
)
return Div(
revert_control,
Div(id=next_step_id, hx_get=f"/{app_name}/{next_step_id}", hx_trigger="load"),
id=step_id
)
# Get data from previous steps
step_01_data = pip.get_step_data(pipeline_id, "step_01", {})
step_02_data = pip.get_step_data(pipeline_id, "step_02", {})
step_03_data = pip.get_step_data(pipeline_id, "step_03", {})
org = step_01_data.get('org')
project = step_01_data.get('project')
analysis = step_02_data.get('analysis')
depth = step_03_data.get('depth')
# Check if we have all required data
if not all([org, project, analysis, depth]):
return P("Missing required data from previous steps. Please complete all steps first.",
style=pip.get_style("error"))
# Check if an export with these parameters already exists
existing_path = Path("downloads") / org / project / analysis
existing_files = list(existing_path.glob(f"*depth_{depth}*.csv")) if existing_path.exists() else []
if existing_files:
existing_file = existing_files[0]
await self.message_queue.add(pip, f"Found existing export: {existing_file.name}", verbatim=True)
return Div(
Card(
H4(f"{pip.fmt(step_id)}: {step.show}"),
P(f"An export for project '{project}', analysis '{analysis}' at depth {depth} already exists:",
style="margin-bottom: 0.5rem;"),
P(f"File: {existing_file.name}", style="margin-bottom: 1rem;"),
Div(
Button("Use Existing Export", type="button", cls="primary",
hx_post=f"/{app_name}/use_existing_export",
hx_target=f"#{step.id}",
hx_vals=f'{{"pipeline_id": "{pipeline_id}", "file_path": "{existing_file}"}}'
),
Button("Create New Export", type="button",
hx_trigger="click",
hx_target=f"#{step.id}",
hx_get=f"/{app_name}/{step.id}/new"
),
style="display: flex; gap: 1rem;"
)
),
Div(id=next_step_id),
id=step.id
)
await self.message_queue.add(pip, self.step_messages[step_id]["input"], verbatim=True)
# Create the field selection form
return Div(
Card(
H4(f"{pip.fmt(step_id)}: Configure {step.show}"),
P(f"Export URLs up to depth {depth} from the {analysis} analysis.",
style="margin-bottom: 1rem;"),
P("Select additional fields to include in the export:",
style="margin-bottom: 0.5rem;"),
Form(
Div(
Label(
Input(type="checkbox", name="include_title", value="true", checked=True),
" Include page titles",
style="display: block; margin-bottom: 0.5rem;"
),
Label(
Input(type="checkbox", name="include_meta_desc", value="true", checked=True),
" Include meta descriptions",
style="display: block; margin-bottom: 0.5rem;"
),
Label(
Input(type="checkbox", name="include_h1", value="true", checked=True),
" Include H1 headings",
style="display: block; margin-bottom: 1rem;"
),
style="margin-bottom: 1.5rem;"
),
Button("Start Export", type="submit", cls="primary"),
P("Note: Large exports may take several minutes to process.",
style="font-size: 0.8em; color: #666; margin-top: 0.5rem;"),
hx_post=f"/{app_name}/{step.id}_submit",
hx_target=f"#{step.id}"
)
),
Div(id=next_step_id),
id=step.id
)
async def step_04_new(self, request):
"""Handle request to create a new export instead of using existing one"""
pip, db, steps, app_name = self.pipulate, self.db, self.steps, self.app_name
step_id = "step_04"
step_index = self.steps_indices[step_id]
step = steps[step_index]
next_step_id = steps[step_index + 1].id if step_index < len(steps) - 1 else None
pipeline_id = db.get("pipeline_id", "unknown")
# Get data from previous steps
step_01_data = pip.get_step_data(pipeline_id, "step_01", {})
step_02_data = pip.get_step_data(pipeline_id, "step_02", {})
step_03_data = pip.get_step_data(pipeline_id, "step_03", {})
org = step_01_data.get('org')
project = step_01_data.get('project')
analysis = step_02_data.get('analysis')
depth = step_03_data.get('depth')
# Create the field selection form for new export
return Div(
Card(
H4(f"{pip.fmt(step_id)}: Configure {step.show}"),
P(f"Export URLs up to depth {depth} from the {analysis} analysis.",
style="margin-bottom: 1rem;"),
P("Select additional fields to include in the export:",
style="margin-bottom: 0.5rem;"),
Form(
Div(
Label(
Input(type="checkbox", name="include_title", value="true", checked=True),
" Include page titles",
style="display: block; margin-bottom: 0.5rem;"
),
Label(
Input(type="checkbox", name="include_meta_desc", value="true", checked=True),
" Include meta descriptions",
style="display: block; margin-bottom: 0.5rem;"
),
Label(
Input(type="checkbox", name="include_h1", value="true", checked=True),
" Include H1 headings",
style="display: block; margin-bottom: 1rem;"
),
style="margin-bottom: 1.5rem;"
),
Button("Start Export", type="submit", cls="primary"),
P("Note: Large exports may take several minutes to process.",
style="font-size: 0.8em; color: #666; margin-top: 0.5rem;"),
hx_post=f"/{app_name}/{step.id}_submit",
hx_target=f"#{step.id}"
)
),
Div(id=next_step_id),
id=step.id
)
async def use_existing_export(self, request):
"""
Use an existing export file instead of creating a new one
"""
pip, db, steps, app_name = self.pipulate, self.db, self.steps, self.app_name
step_id = "step_04"
step_index = self.steps_indices[step_id]
step = steps[step_index]
# Get form data
form = await request.form()
pipeline_id = form.get("pipeline_id")
file_path = form.get("file_path")
if not all([pipeline_id, file_path]):
return P("Missing required parameters", style=pip.get_style("error"))
# Update state with existing file information
state = pip.read_state(pipeline_id)
if step_id not in state:
state[step_id] = {}
state[step_id].update({
'local_file': file_path,
'status': 'DONE',
'is_existing_file': True
})
# Set a dummy URL in the done field since we need something there
state[step_id][step.done] = f"existing://{file_path}"
pip.write_state(pipeline_id, state)
# Send confirmation message
await self.message_queue.add(pip, f"Using existing export file: {file_path}", verbatim=True)
# Return navigation controls
return pip.create_step_navigation(step_id, step_index, steps, app_name, file_path)
async def download_csv(self, request):
"""
Handle downloading the CSV file from a completed export job
"""
pip, db, steps, app_name = self.pipulate, self.db, self.steps, self.app_name
step_id = "step_04"
step_index = self.steps_indices[step_id]
step = steps[step_index]
# Get form data
form = await request.form()
pipeline_id = form.get("pipeline_id")
if not pipeline_id:
return P("Missing pipeline_id parameter", style=pip.get_style("error"))
# Get state data
state = pip.read_state(pipeline_id)
step_data = pip.get_step_data(pipeline_id, step_id, {})
# Extract necessary information
download_url = step_data.get('download_url')
org = step_data.get('org')
project = step_data.get('project')
analysis = step_data.get('analysis')
depth = step_data.get('depth', '0')
job_id = step_data.get('job_id', 'unknown')
if not download_url:
# Check if the job is complete and get the download URL
try:
job_url = step_data.get(step.done)
if not job_url:
return P("No job URL found", style=pip.get_style("error"))
api_token = self.read_api_token()
is_complete, download_url, error = await self.poll_job_status(job_url, api_token)
if not is_complete or not download_url:
return Div(
P("Export job is still processing. Please try again in a few minutes.", style="margin-bottom: 1rem;"),
Progress(value="60", max="100", style="width: 100%;"),
P(f"Error: {error}" if error else "", style=pip.get_style("error"))
)
# Update state with download URL
state[step_id]['download_url'] = download_url
state[step_id]['status'] = 'DONE'
pip.write_state(pipeline_id, state)
except Exception as e:
return P(f"Error checking job status: {str(e)}", style=pip.get_style("error"))
# Create the download directory
try:
download_dir = await self.create_download_directory(org, project, analysis)
# Generate filename based on depth and fields included
include_fields = step_data.get('include_fields', {})
fields_suffix = "_".join(k for k, v in include_fields.items() if v)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{org}_{project}_{analysis}_depth_{depth}_{fields_suffix}_{timestamp}.csv"
local_file_path = download_dir / filename
# Start downloading with progress feedback
await self.message_queue.add(pip, f"Starting download from {download_url}", verbatim=True)
# Show download progress
return Div(
Card(
H4("Downloading CSV File"),
P(f"Downloading export to {local_file_path}", style="margin-bottom: 1rem;"),
Progress(value="10", max="100", style="width: 100%;"),
P("Please wait, this may take a few minutes for large files...",
style="font-size: 0.9em; color: #666;")
),
hx_get=f"/{app_name}/download_progress",
hx_trigger="load",
hx_target=f"#{step_id}",
hx_vals=f'{{"pipeline_id": "{pipeline_id}", "download_url": "{download_url}", "local_file": "{local_file_path}"}}',
id=step_id
)
except Exception as e:
return P(f"Error preparing download: {str(e)}", style=pip.get_style("error"))
async def download_progress(self, request):
"""
Handle the actual file download and show progress
"""
pip, db, steps, app_name = self.pipulate, self.db, self.steps, self.app_name
step_id = "step_04"
step_index = self.steps_indices[step_id]
step = steps[step_index]
next_step_id = steps[step_index + 1].id if step_index < len(steps) - 1 else None
# Get request data
pipeline_id = request.query_params.get("pipeline_id")
download_url = request.query_params.get("download_url")
local_file = request.query_params.get("local_file")
if not all([pipeline_id, download_url, local_file]):
return P("Missing required parameters", style=pip.get_style("error"))
local_file_path = Path(local_file)
try:
# Perform the actual download
api_token = self.read_api_token()
headers = {"Authorization": f"Token {api_token}"}
# Create parent directories if they don't exist
local_file_path.parent.mkdir(parents=True, exist_ok=True)
# Download the file using httpx with streaming
async with httpx.AsyncClient() as client:
async with client.stream("GET", download_url, headers=headers, follow_redirects=True) as response:
response.raise_for_status()
# Get content length if available
total_size = int(response.headers.get('content-length', 0))
# Open file for writing
with open(local_file_path, 'wb') as f:
async for chunk in response.aiter_bytes(chunk_size=8192):
f.write(chunk)
# Check if file needs to be unzipped
if local_file_path.suffix.lower() in ('.zip', '.gz'):
await self.message_queue.add(pip, f"Extracting {local_file_path}", verbatim=True)
# Unzip the file (implementation depends on file type)
if local_file_path.suffix.lower() == '.zip':
import zipfile
with zipfile.ZipFile(local_file_path, 'r') as zip_ref:
# Get the first CSV file in the archive
csv_files = [f for f in zip_ref.namelist() if f.lower().endswith('.csv')]
if not csv_files:
return P("No CSV files found in the downloaded ZIP archive", style=pip.get_style("error"))
# Extract the CSV file
csv_file = csv_files[0]
extracted_path = local_file_path.parent / csv_file
zip_ref.extract(csv_file, local_file_path.parent)
# Update the local file path to the extracted file
local_file_path = extracted_path
elif local_file_path.suffix.lower() == '.gz':
import gzip
import shutil
# Assume it's a .csv.gz file and create a .csv file
extracted_path = local_file_path.with_suffix('')
with gzip.open(local_file_path, 'rb') as f_in:
with open(extracted_path, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
# Update the local file path to the extracted file
local_file_path = extracted_path
# Update state with local file path
state = pip.read_state(pipeline_id)
state[step_id]['local_file'] = str(local_file_path)
pip.write_state(pipeline_id, state)
# Get file information
file_size_bytes = local_file_path.stat().st_size
file_size_mb = file_size_bytes / (1024 * 1024)
# Send success message
await self.message_queue.add(
pip,
f"Successfully downloaded and prepared CSV file:\n"
f"Path: {local_file_path}\n"
f"Size: {file_size_mb:.2f} MB",
verbatim=True
)
# Return success UI with file information
return Div(
pip.revert_control(
step_id=step_id,
app_name=app_name,
message=f"{step.show}: CSV downloaded ({file_size_mb:.2f} MB)",
steps=steps
),
Div(id=next_step_id, hx_get=f"/{app_name}/{next_step_id}", hx_trigger="load"),
id=step_id
)
except Exception as e:
return Div(
Card(
H4("Download Error"),
P(f"Error downloading CSV file: {str(e)}", style=pip.get_style("error")),
P(f"Download URL: {download_url}"),
P(f"Target file: {local_file_path}"),
Button("Try Again", type="button", cls="primary",
hx_post=f"/{app_name}/download_csv",
hx_target=f"#{step_id}",
hx_vals=f'{{"pipeline_id": "{pipeline_id}"}}')
),
id=step_id
)
Now let’s add the new route registration to the __init__
method:
# pipulate/plugins/50_botify_export.py
# Add these to the routes list in __init__ method
routes = [
# ... existing routes ...
(f"/{app_name}/download_csv", self.download_csv, ["POST"]),
(f"/{app_name}/download_progress", self.download_progress),
(f"/{app_name}/use_existing_export", self.use_existing_export, ["POST"]),
(f"/{app_name}/{step_id}/new", getattr(self, f"{step_id}_new")),
]
This implementation adds several important improvements:
- Idempotency and File Reuse:
- Checks for existing exports that match the current parameters
- Allows users to choose between using an existing export or creating a new one
- Stores file paths and URLs in state for future reference
- File Download and Processing:
- Downloads files with proper streaming to handle large files
- Extracts compressed files (zip or gzip) automatically
- Shows download progress using PicoCSS progress bars
- Provides file size and path information
- Enhanced UI Experience:
- Adds “Download CSV” button when export is ready but not yet downloaded
- Shows appropriate message based on the current state (processing, ready, downloaded)
- Handles errors gracefully with retry options
- Polling for Completion:
- Checks job status when viewing completed steps to see if downloads are ready
- Updates state with download URLs when jobs complete
This implementation follows the Pipulate workflow patterns closely while adding the robust file handling capabilities needed. The UI maintains the established look and feel, with PicoCSS progress indicators to show status without being distracting.
Would you like me to explain any specific part in more detail or make any adjustments to this implementation?
Fiddling Around With Solution
Me: Alright, this is doing the downloading and even the unzipping brilliantly! My only issue is with process. The idempotent part is a problem. Fiddling around with a workflow allows me to re-request the export over and over, which is the “expensive” part. Once we know a export request was successfully begun for an org / project/ analysis combination, that should somehow block future export requests on the same combination, especially if the file has been generated.
I noticed the slick CSV button there once, and I also notice that redownload attempts once the CSV is in place asks if you want to use the same or generate a new one. So I feel there is some very slick stuff going on here, but it just does not feel all worked out. Attempts to cause the export attempt to happen again is to easy. And once a export has successfully begun that wait during the polling appears too difficult or fragile. There has only been one successful download so far.
Claude: You’re right about the idempotency issues. Let me enhance the workflow to better handle repeated export requests for the same data. Here’s what I’ll implement:
- Create a central export job registry in a JSON file to track all export jobs across all workflow instances
- Add stricter checks before initiating new export jobs
- Improve the status display and polling mechanism
# pipulate/plugins/50_botify_export.py
import asyncio
from collections import namedtuple
from datetime import datetime
from urllib.parse import urlparse
import httpx
from pathlib import Path
import json
import os
from fasthtml.common import *
from loguru import logger
"""
Pipulate Workflow Template
After copy/pasting this file, edit this docstring first so that your
AI coding assistant knows what you're trying to do.
This file demonstrates the basic pattern for Pipulate Workflows:
1. Define steps with optional transformations
2. Each step collects or processes data
3. Data flows from one step to the next
4. Steps can be reverted and the workflow can be finalized
To create your own Workflow:
1. Copy this file and rename the class, APP_NAME, DISPLAY_NAME, ENDPOINT_MESSAGE
2. Create a [filename].md the training folder (no path needed) and set TRAINING_PROMPT to refer to it
3. Define your own steps
4. Implement custom validation and processing as needed
There are two types of apps in Pipulate:
1. Workflows - Linear, step-based apps. The part you're looking at. WET.
2. Apps - CRUD apps with a single table that inherit from BaseApp. DRY.
CRUD is DRY and Workflows are WET! Get ready to get WET!
"""
# Path to the export registry file
EXPORT_REGISTRY_FILE = Path("downloads/export_registry.json")
# This is the model for a Notebook cell or step (do not change)
Step = namedtuple('Step', ['id', 'done', 'show', 'refill', 'transform'], defaults=(None,))
Now let’s add the registry management functions:
# pipulate/plugins/50_botify_export.py
def get_export_key(org, project, analysis, depth):
"""Generate a unique key for an export configuration"""
return f"{org}_{project}_{analysis}_depth_{depth}"
def load_export_registry():
"""Load the export registry from file"""
try:
if EXPORT_REGISTRY_FILE.exists():
with open(EXPORT_REGISTRY_FILE, 'r') as f:
return json.load(f)
return {}
except Exception as e:
logger.error(f"Error loading export registry: {e}")
return {}
def save_export_registry(registry):
"""Save the export registry to file"""
try:
# Create directory if it doesn't exist
EXPORT_REGISTRY_FILE.parent.mkdir(parents=True, exist_ok=True)
with open(EXPORT_REGISTRY_FILE, 'w') as f:
json.dump(registry, f, indent=2)
except Exception as e:
logger.error(f"Error saving export registry: {e}")
def register_export_job(org, project, analysis, depth, job_url, job_id):
"""Register a new export job in the registry"""
registry = load_export_registry()
export_key = get_export_key(org, project, analysis, depth)
# Check if this export already exists
if export_key not in registry:
registry[export_key] = []
# Add the new job
registry[export_key].append({
'job_url': job_url,
'job_id': job_id,
'status': 'PROCESSING',
'created': datetime.now().isoformat(),
'updated': datetime.now().isoformat(),
'download_url': None,
'local_file': None
})
save_export_registry(registry)
return export_key
def update_export_job(org, project, analysis, depth, job_id, updates):
"""Update an existing export job in the registry"""
registry = load_export_registry()
export_key = get_export_key(org, project, analysis, depth)
if export_key not in registry:
logger.error(f"Export key {export_key} not found in registry")
return False
for job in registry[export_key]:
if job['job_id'] == job_id:
job.update(updates)
job['updated'] = datetime.now().isoformat()
save_export_registry(registry)
return True
logger.error(f"Job ID {job_id} not found for export key {export_key}")
return False
def find_export_jobs(org, project, analysis, depth):
"""Find all export jobs for a specific configuration"""
registry = load_export_registry()
export_key = get_export_key(org, project, analysis, depth)
return registry.get(export_key, [])
Let’s update the methods to use the registry for checking existing jobs and preventing duplicates:
# pipulate/plugins/50_botify_export.py
async def step_04(self, request):
"""
Display the CSV export form with field selection options
"""
pip, db, steps, app_name = self.pipulate, self.db, self.steps, self.app_name
step_id = "step_04"
step_index = self.steps_indices[step_id]
step = steps[step_index]
next_step_id = steps[step_index + 1].id if step_index < len(steps) - 1 else None
pipeline_id = db.get("pipeline_id", "unknown")
state = pip.read_state(pipeline_id)
step_data = pip.get_step_data(pipeline_id, step_id, {})
user_val = step_data.get(step.done, "")
# Handle finalized state
finalize_data = pip.get_step_data(pipeline_id, "finalize", {})
if "finalized" in finalize_data:
# For finalized state, show the download link if available
download_url = step_data.get('download_url')
local_file = step_data.get('local_file')
if local_file and Path(local_file).exists():
download_msg = f"🔒 {step.show}: CSV file downloaded to {local_file}"
elif download_url:
download_msg = f"🔒 {step.show}: Ready for download"
else:
download_msg = f"🔒 {step.show}: Job ID {user_val.split('/')[-1] if user_val else 'Unknown'}"
return Div(
Card(download_msg),
Div(id=next_step_id, hx_get=f"/{self.app_name}/{next_step_id}", hx_trigger="load")
)
# If step is complete and not being reverted, show revert control with download info
if user_val and state.get("_revert_target") != step_id:
job_id = user_val.split("/")[-1] if user_val else "Unknown"
download_url = step_data.get('download_url')
local_file = step_data.get('local_file')
# Display different message based on download status
if local_file and Path(local_file).exists():
display_msg = f"{step.show}: CSV file downloaded (Job ID {job_id})"
elif download_url:
display_msg = f"{step.show}: Ready for download (Job ID {job_id})"
else:
# Poll the job status to check if it's complete
try:
api_token = self.read_api_token()
is_complete, download_url, _ = await self.poll_job_status(user_val, api_token)
if is_complete and download_url:
# Update the state with the download URL
state[step_id]['download_url'] = download_url
state[step_id]['status'] = 'DONE'
pip.write_state(pipeline_id, state)
# Also update the registry
if 'org' in step_data and 'project' in step_data and 'analysis' in step_data and 'depth' in step_data:
self.update_export_job(
step_data['org'],
step_data['project'],
step_data['analysis'],
step_data['depth'],
job_id,
{'status': 'DONE', 'download_url': download_url}
)
display_msg = f"{step.show}: Ready for download (Job ID {job_id})"
else:
display_msg = f"{step.show}: Processing (Job ID {job_id})"
except Exception:
display_msg = f"{step.show}: Job ID {job_id}"
revert_control = pip.revert_control(
step_id=step_id,
app_name=app_name,
message=display_msg,
steps=steps
)
# Add a download button if the file is ready but not yet downloaded
if download_url and not (local_file and Path(local_file).exists()):
download_button = Form(
Button("Download CSV", type="submit", cls="secondary outline"),
hx_post=f"/{app_name}/download_csv",
hx_target=f"#{step_id}",
hx_swap="outerHTML",
hx_vals=f'{{"pipeline_id": "{pipeline_id}"}}'
)
return Div(
revert_control,
download_button,
Div(id=next_step_id, hx_get=f"/{app_name}/{next_step_id}", hx_trigger="load"),
id=step_id
)
return Div(
revert_control,
Div(id=next_step_id, hx_get=f"/{app_name}/{next_step_id}", hx_trigger="load"),
id=step_id
)
# Get data from previous steps
step_01_data = pip.get_step_data(pipeline_id, "step_01", {})
step_02_data = pip.get_step_data(pipeline_id, "step_02", {})
step_03_data = pip.get_step_data(pipeline_id, "step_03", {})
org = step_01_data.get('org')
project = step_01_data.get('project')
analysis = step_02_data.get('analysis')
depth = step_03_data.get('depth')
# Check if we have all required data
if not all([org, project, analysis, depth]):
return P("Missing required data from previous steps. Please complete all steps first.",
style=pip.get_style("error"))
# Check for existing jobs in the registry
existing_jobs = self.find_export_jobs(org, project, analysis, depth)
completed_jobs = [job for job in existing_jobs if job['status'] == 'DONE' and job.get('download_url')]
processing_jobs = [job for job in existing_jobs if job['status'] == 'PROCESSING']
# Check if any processing jobs are actually done
if processing_jobs:
api_token = self.read_api_token()
for job in processing_jobs:
job_url = job['job_url']
job_id = job['job_id']
try:
is_complete, download_url, _ = await self.poll_job_status(job_url, api_token)
if is_complete and download_url:
self.update_export_job(
org, project, analysis, depth, job_id,
{'status': 'DONE', 'download_url': download_url}
)
completed_jobs.append({**job, 'status': 'DONE', 'download_url': download_url})
processing_jobs.remove(job)
except Exception as e:
logger.error(f"Error checking job status: {e}")
# Look for completed jobs with local files
downloaded_jobs = []
for job in completed_jobs:
if job.get('local_file') and Path(job['local_file']).exists():
downloaded_jobs.append(job)
# Check if an export with these parameters already exists
existing_path = Path("downloads") / org / project / analysis
existing_files = list(existing_path.glob(f"*depth_{depth}*.csv")) if existing_path.exists() else []
if downloaded_jobs:
# We have a downloaded job - offer to use it
job = downloaded_jobs[0]
local_file = job['local_file']
file_path = Path(local_file)
await self.message_queue.add(pip, f"Found existing downloaded export: {file_path.name}", verbatim=True)
return Div(
Card(
H4(f"{pip.fmt(step_id)}: {step.show}"),
P(f"An export for project '{project}', analysis '{analysis}' at depth {depth} has already been downloaded:",
style="margin-bottom: 0.5rem;"),
P(f"File: {file_path.name}", style="margin-bottom: 1rem;"),
P(f"Path: {file_path}", style="margin-bottom: 1rem;"),
Div(
Button("Use Existing Download", type="button", cls="primary",
hx_post=f"/{app_name}/use_existing_export",
hx_target=f"#{step.id}",
hx_vals=f'{{"pipeline_id": "{pipeline_id}", "file_path": "{file_path}"}}'
),
Button("Create New Export", type="button",
hx_get=f"/{app_name}/{step.id}/new",
hx_target=f"#{step.id}"
),
style="display: flex; gap: 1rem;"
)
),
Div(id=next_step_id),
id=step.id
)
elif completed_jobs:
# We have a completed job but no download - offer to download it
job = completed_jobs[0]
job_id = job['job_id']
download_url = job['download_url']
await self.message_queue.add(pip, f"Found existing completed export (Job ID: {job_id})", verbatim=True)
return Div(
Card(
H4(f"{pip.fmt(step_id)}: {step.show}"),
P(f"An export for project '{project}', analysis '{analysis}' at depth {depth} is ready to download:",
style="margin-bottom: 1rem;"),
P(f"Job ID: {job_id}", style="margin-bottom: 0.5rem;"),
Div(
Button("Download Ready Export", type="button", cls="primary",
hx_post=f"/{app_name}/download_ready_export",
hx_target=f"#{step.id}",
hx_vals=f'{{"pipeline_id": "{pipeline_id}", "job_id": "{job_id}", "download_url": "{download_url}"}}'
),
Button("Create New Export", type="button",
hx_get=f"/{app_name}/{step.id}/new",
hx_target=f"#{step.id}"
),
style="display: flex; gap: 1rem;"
)
),
Div(id=next_step_id),
id=step.id
)
elif processing_jobs:
# We have a processing job - show its status
job = processing_jobs[0]
job_id = job['job_id']
job_url = job['job_url']
created = job.get('created', 'Unknown')
# Try to parse the timestamp
try:
created_dt = datetime.fromisoformat(created)
created_str = created_dt.strftime("%Y-%m-%d %H:%M:%S")
except:
created_str = created
await self.message_queue.add(
pip,
f"Found existing export job in progress (Job ID: {job_id}, Started: {created_str})",
verbatim=True
)
return Div(
Card(
H4(f"{pip.fmt(step_id)}: {step.show}"),
P(f"An export for project '{project}', analysis '{analysis}' at depth {depth} is already processing:",
style="margin-bottom: 1rem;"),
P(f"Job ID: {job_id}", style="margin-bottom: 0.5rem;"),
P(f"Started: {created_str}", style="margin-bottom: 0.5rem;"),
Progress(value="60", max="100", style="width: 100%; margin-bottom: 1rem;"),
P("The export job is still processing. Please wait for it to complete.",
style="color: #666; margin-bottom: 1rem;"),
Div(
Button("Check Status", type="button", cls="primary",
hx_post=f"/{app_name}/check_export_status",
hx_target=f"#{step.id}",
hx_vals=f'{{"pipeline_id": "{pipeline_id}", "job_id": "{job_id}", "job_url": "{job_url}"}}'
),
Button("Create New Export", type="button",
hx_get=f"/{app_name}/{step.id}/new",
hx_target=f"#{step.id}"
),
style="display: flex; gap: 1rem;"
)
),
Div(id=next_step_id),
id=step.id
)
elif existing_files:
# Found existing files on disk but not in registry
existing_file = existing_files[0]
await self.message_queue.add(pip, f"Found existing export file: {existing_file.name}", verbatim=True)
return Div(
Card(
H4(f"{pip.fmt(step_id)}: {step.show}"),
P(f"An export file for project '{project}', analysis '{analysis}' at depth {depth} was found on disk:",
style="margin-bottom: 0.5rem;"),
P(f"File: {existing_file.name}", style="margin-bottom: 1rem;"),
Div(
Button("Use Existing File", type="button", cls="primary",
hx_post=f"/{app_name}/use_existing_export",
hx_target=f"#{step.id}",
hx_vals=f'{{"pipeline_id": "{pipeline_id}", "file_path": "{existing_file}"}}'
),
Button("Create New Export", type="button",
hx_get=f"/{app_name}/{step.id}/new",
hx_target=f"#{step.id}"
),
style="display: flex; gap: 1rem;"
)
),
Div(id=next_step_id),
id=step.id
)
await self.message_queue.add(pip, self.step_messages[step_id]["input"], verbatim=True)
# Create the field selection form
return Div(
Card(
H4(f"{pip.fmt(step_id)}: Configure {step.show}"),
P(f"Export URLs up to depth {depth} from the {analysis} analysis.",
style="margin-bottom: 1rem;"),
P("Select additional fields to include in the export:",
style="margin-bottom: 0.5rem;"),
Form(
Div(
Label(
Input(type="checkbox", name="include_title", value="true", checked=True),
" Include page titles",
style="display: block; margin-bottom: 0.5rem;"
),
Label(
Input(type="checkbox", name="include_meta_desc", value="true", checked=True),
" Include meta descriptions",
style="display: block; margin-bottom: 0.5rem;"
),
Label(
Input(type="checkbox", name="include_h1", value="true", checked=True),
" Include H1 headings",
style="display: block; margin-bottom: 1rem;"
),
style="margin-bottom: 1.5rem;"
),
Button("Start Export", type="submit", cls="primary"),
P("Note: Large exports may take several minutes to process.",
style="font-size: 0.8em; color: #666; margin-top: 0.5rem;"),
hx_post=f"/{app_name}/{step.id}_submit",
hx_target=f"#{step.id}"
)
),
Div(id=next_step_id),
id=step.id
)
Let’s fix the file - I think the editor removed too much of the file. Let’s add back the removed methods:
# pipulate/plugins/50_botify_export.py
# --- Finalization & Unfinalization ---
async def finalize(self, request):
"""
Finalize the workflow, locking all steps from further changes.
This method handles both GET requests (displaying finalization UI) and
POST requests (performing the actual finalization). The UI portions
are intentionally kept WET to allow for full customization of the user
experience, while core state management is handled by DRY helper methods.
Customization Points:
- GET response: The cards/UI shown before finalization
- Confirmation message: What the user sees after finalizing
- Any additional UI elements or messages
Args:
request: The HTTP request object
Returns:
UI components for either the finalization prompt or confirmation
"""
pip, db, steps, app_name = self.pipulate, self.db, self.steps, self.app_name
pipeline_id = db.get("pipeline_id", "unknown")
finalize_step = steps[-1]
finalize_data = pip.get_step_data(pipeline_id, finalize_step.id, {})
logger.debug(f"Pipeline ID: {pipeline_id}")
logger.debug(f"Finalize step: {finalize_step}")
logger.debug(f"Finalize data: {finalize_data}")
# ───────── GET REQUEST: FINALIZATION UI (INTENTIONALLY WET) ─────────
if request.method == "GET":
if finalize_step.done in finalize_data:
logger.debug("Pipeline is already finalized")
return Card(
H4("Workflow is locked."),
# P(f"Pipeline is finalized. Use {pip.UNLOCK_BUTTON_LABEL} to make changes."),
Form(
Button(pip.UNLOCK_BUTTON_LABEL, type="submit", cls="secondary outline"),
hx_post=f"/{app_name}/unfinalize",
hx_target=f"#{app_name}-container",
hx_swap="outerHTML"
),
id=finalize_step.id
)
# Check if all previous steps are complete
non_finalize_steps = steps[:-1]
all_steps_complete = all(
pip.get_step_data(pipeline_id, step.id, {}).get(step.done)
for step in non_finalize_steps
)
logger.debug(f"All steps complete: {all_steps_complete}")
if all_steps_complete:
return Card(
H4("All steps complete. Finalize?"),
P("You can revert to any step and make changes.", style="font-size: 0.9em; color: #666;"),
# P("All data is saved. Lock it in?"),
Form(
Button("Finalize", type="submit", cls="primary"),
hx_post=f"/{app_name}/finalize",
hx_target=f"#{app_name}-container",
hx_swap="outerHTML"
),
id=finalize_step.id
)
else:
return Div(P("Nothing to finalize yet."), id=finalize_step.id)
# ───────── END GET REQUEST ─────────
else:
# ───────── POST REQUEST: PERFORM FINALIZATION ─────────
# Update state using DRY helper
await pip.finalize_workflow(pipeline_id)
# ───────── CUSTOM FINALIZATION UI (INTENTIONALLY WET) ─────────
# Send a confirmation message
await self.message_queue.add(pip, "Workflow successfully finalized! Your data has been saved and locked.", verbatim=True)
# Return the updated UI
return pip.rebuild(app_name, steps)
# ───────── END CUSTOM FINALIZATION UI ─────────
async def unfinalize(self, request):
"""
Unfinalize the workflow, allowing steps to be modified again.
This method removes the finalization flag from the workflow state
and displays a confirmation message to the user. The core state
management is handled by a DRY helper method, while the UI generation
is intentionally kept WET for customization.
Customization Points:
- Confirmation message: What the user sees after unfinalizing
- Any additional UI elements or actions
Args:
request: The HTTP request object
Returns:
UI components showing the workflow is unlocked
"""
pip, db, steps, app_name = self.pipulate, self.db, self.steps, self.app_name
pipeline_id = db.get("pipeline_id", "unknown")
# Update state using DRY helper
await pip.unfinalize_workflow(pipeline_id)
# ───────── CUSTOM UNFINALIZATION UI (INTENTIONALLY WET) ─────────
# Send a message informing them they can revert to any step
await self.message_queue.add(pip, "Workflow unfinalized! You can now revert to any step and make changes.", verbatim=True)
# Return the rebuilt UI
return pip.rebuild(app_name, steps)
# ───────── END CUSTOM UNFINALIZATION UI ─────────
def run_all_cells(self, steps, app_name):
"""Generate placeholders for all steps through Pipulate helper."""
return self.pipulate.run_all_cells(app_name, steps)
async def jump_to_step(self, request):
"""
Jump to a specific step in the workflow.
This method updates the step_id in the database and rebuilds the UI
to show the workflow from the selected step.
Args:
request: The HTTP request object containing the step_id
Returns:
FastHTML components showing the workflow from the selected step
"""
pip, db, steps, app_name = self.pipulate, self.db, self.steps, self.app_name
form = await request.form()
step_id = form.get("step_id")
db["step_id"] = step_id
return pip.rebuild(app_name, steps)
async def get_suggestion(self, step_id, state):
"""
Get a suggestion value for a step based on transform function.
If the step has a transform function, use the previous step's output
to generate a suggested value. This enables data to flow naturally
from one step to the next, creating a connected workflow experience.
Args:
step_id: The ID of the step to generate a suggestion for
state: The current workflow state
Returns:
str: The suggested value or empty string if not applicable
"""
pip, db, steps = self.pipulate, self.db, self.steps
# If a transform function exists, use the previous step's output.
step = next((s for s in steps if s.id == step_id), None)
if not step or not step.transform:
return ""
prev_index = self.steps_indices[step_id] - 1
if prev_index < 0:
return ""
prev_step_id = steps[prev_index].id
prev_step = steps[prev_index]
prev_data = pip.get_step_data(db["pipeline_id"], prev_step_id, {})
prev_word = prev_data.get(prev_step.done, "")
return step.transform(prev_word) if prev_word else ""
async def handle_revert(self, request):
"""
Handle reverting to a previous step in the workflow.
This method clears state data from the specified step forward,
marks the step as the revert target in the state, and rebuilds
the workflow UI. It allows users to go back and modify their
inputs at any point in the workflow process.
Args:
request: The HTTP request object containing the step_id
Returns:
FastHTML components representing the rebuilt workflow UI
"""
pip, db, steps, app_name = self.pipulate, self.db, self.steps, self.app_name
form = await request.form()
step_id = form.get("step_id")
pipeline_id = db.get("pipeline_id", "unknown")
if not step_id:
return P("Error: No step specified", style=pip.get_style("error"))
await pip.clear_steps_from(pipeline_id, step_id, steps)
state = pip.read_state(pipeline_id)
state["_revert_target"] = step_id
pip.write_state(pipeline_id, state)
message = await pip.get_state_message(pipeline_id, steps, self.step_messages)
await self.message_queue.add(pip, message, verbatim=True)
return pip.rebuild(app_name, steps)
def read_api_token(self):
"""Read Botify API token from local file."""
try:
token_file = Path("botify_token.txt")
if not token_file.exists():
raise FileNotFoundError("botify_token.txt not found")
token = token_file.read_text().splitlines()[0].strip()
return token
except Exception as e:
raise ValueError(f"Error reading API token: {e}")
async def fetch_analyses(self, org, project, api_token):
"""Fetch analysis slugs for a given project from Botify API."""
url = f"https://api.botify.com/v1/analyses/{org}/{project}/light"
headers = {"Authorization": f"Token {api_token}"}
slugs = []
async with httpx.AsyncClient() as client:
try:
while url:
response = await client.get(url, headers=headers)
response.raise_for_status()
data = response.json()
slugs.extend(a['slug'] for a in data.get('results', []))
url = data.get('next')
return slugs
except httpx.RequestError as e:
raise ValueError(f"Error fetching analyses: {e}")
async def get_urls_by_depth(self, org, project, analysis, api_key):
"""
Fetches URL counts aggregated by depth from the Botify API.
Returns a dictionary {depth: count} or an empty {} on error.
"""
api_url = f"https://api.botify.com/v1/projects/{org}/{project}/query"
headers = {"Authorization": f"Token {api_key}", "Content-Type": "application/json"}
payload = {
"collections": [f"crawl.{analysis}"],
"query": {
"dimensions": [f"crawl.{analysis}.depth"],
"metrics": [{"field": f"crawl.{analysis}.count_urls_crawl"}],
"sort": [{"field": f"crawl.{analysis}.depth", "order": "asc"}]
}
}
try:
async with httpx.AsyncClient() as client:
response = await client.post(api_url, headers=headers, json=payload, timeout=120.0)
response.raise_for_status()
results = response.json().get("results", [])
depth_distribution = {}
for row in results:
if "dimensions" in row and len(row["dimensions"]) == 1 and \
"metrics" in row and len(row["metrics"]) == 1:
depth = row["dimensions"][0]
count = row["metrics"][0]
if isinstance(depth, int):
depth_distribution[depth] = count
return depth_distribution
except Exception as e:
logger.error(f"Error fetching URL depths: {str(e)}")
return {}
async def calculate_max_safe_depth(self, org, project, analysis, api_key):
"""Calculate maximum depth that keeps cumulative URLs under 1M and return count details"""
depth_distribution = await self.get_urls_by_depth(org, project, analysis, api_key)
if not depth_distribution:
return None, None, None
cumulative_sum = 0
sorted_depths = sorted(depth_distribution.keys())
for i, depth in enumerate(sorted_depths):
prev_sum = cumulative_sum
cumulative_sum += depth_distribution[depth]
if cumulative_sum >= 1_000_000:
safe_depth = depth - 1
safe_count = prev_sum
next_depth_count = cumulative_sum
return safe_depth, safe_count, next_depth_count
# If we never hit 1M, return the max depth and its cumulative count
return max(sorted_depths), cumulative_sum, None
Now, let’s update the step_04_submit method to integrate with the registry:
# pipulate/plugins/50_botify_export.py
async def step_04_submit(self, request):
"""Handle the submission of the CSV export options and start the export job"""
pip, db, steps, app_name = self.pipulate, self.db, self.steps, self.app_name
step_id = "step_04"
step_index = self.steps_indices[step_id]
step = steps[step_index]
pipeline_id = db.get("pipeline_id", "unknown")
# Handle finalized state using helper
if step.done == 'finalized':
return await pip.handle_finalized_step(pipeline_id, step_id, steps, app_name, self)
# Get form data
form = await request.form()
include_title = form.get("include_title") == "true"
include_meta_desc = form.get("include_meta_desc") == "true"
include_h1 = form.get("include_h1") == "true"
# Get data from previous steps
state = pip.read_state(pipeline_id)
step_01_data = pip.get_step_data(pipeline_id, "step_01", {})
step_02_data = pip.get_step_data(pipeline_id, "step_02", {})
step_03_data = pip.get_step_data(pipeline_id, "step_03", {})
org = step_01_data.get('org')
project = step_01_data.get('project')
analysis = step_02_data.get('analysis')
depth = step_03_data.get('depth')
# Check if we have all required data
if not all([org, project, analysis, depth]):
return P("Missing required data from previous steps.", style=pip.get_style("error"))
# Check if there's already an export job with these parameters
existing_jobs = self.find_export_jobs(org, project, analysis, depth)
completed_jobs = [job for job in existing_jobs if job['status'] == 'DONE' and job.get('download_url')]
processing_jobs = [job for job in existing_jobs if job['status'] == 'PROCESSING']
# Check if any processing jobs have completed
if processing_jobs:
api_token = self.read_api_token()
for job in processing_jobs[:]: # Use a copy for iteration
job_url = job['job_url']
job_id = job['job_id']
try:
is_complete, download_url, _ = await self.poll_job_status(job_url, api_token)
if is_complete and download_url:
# Update the job status in the registry
self.update_export_job(
org, project, analysis, depth, job_id,
{'status': 'DONE', 'download_url': download_url}
)
# Move from processing to completed
job_with_url = {**job, 'status': 'DONE', 'download_url': download_url}
completed_jobs.append(job_with_url)
processing_jobs.remove(job)
except Exception as e:
logger.error(f"Error checking job status: {e}")
# If there's a completed job, offer to download it instead of starting a new one
if completed_jobs:
job = completed_jobs[0]
job_id = job['job_id']
download_url = job['download_url']
# Update the state with the existing job information
if step_id not in state:
state[step_id] = {}
state[step_id].update({
'job_url': job['job_url'],
'job_id': job_id,
'org': org,
'project': project,
'analysis': analysis,
'depth': depth,
'download_url': download_url,
'status': 'DONE',
'include_fields': {
'title': include_title,
'meta_desc': include_meta_desc,
'h1': include_h1
}
})
# Store the job URL as the "done" field
state[step_id][step.done] = job['job_url']
# Write the state
pip.write_state(pipeline_id, state)
# Send message about reusing the existing export
await self.message_queue.add(
pip,
f"Using existing export job (ID: {job_id}).\n"
f"This job has already completed and is ready for download.",
verbatim=True
)
# Create response UI for the existing job
return Div(
Card(
H4(f"Export Status: Complete ✅"),
P(f"Using existing export job ID: {job_id}", style="margin-bottom: 0.5rem;"),
P(f"The export with your requested parameters is ready for download.",
style="margin-bottom: 1rem;"),
Form(
Button("Download CSV", type="submit", cls="primary"),
hx_post=f"/{app_name}/download_csv",
hx_target=f"#{step_id}",
hx_vals=f'{{"pipeline_id": "{pipeline_id}"}}'
)
),
pip.create_step_navigation(step_id, step_index, steps, app_name, job['job_url']),
id=step_id
)
# If there's a processing job, show its status instead of starting a new one
if processing_jobs:
job = processing_jobs[0]
job_id = job['job_id']
job_url = job['job_url']
# Update the state with the existing job information
if step_id not in state:
state[step_id] = {}
state[step_id].update({
'job_url': job_url,
'job_id': job_id,
'org': org,
'project': project,
'analysis': analysis,
'depth': depth,
'status': 'PROCESSING',
'include_fields': {
'title': include_title,
'meta_desc': include_meta_desc,
'h1': include_h1
}
})
# Store the job URL as the "done" field
state[step_id][step.done] = job_url
# Write the state
pip.write_state(pipeline_id, state)
# Send message about the existing processing job
created = job.get('created', 'Unknown')
try:
created_dt = datetime.fromisoformat(created)
created_str = created_dt.strftime("%Y-%m-%d %H:%M:%S")
except:
created_str = created
await self.message_queue.add(
pip,
f"Using existing export job (ID: {job_id}).\n"
f"This job is still processing (started: {created_str}).",
verbatim=True
)
# Create response UI for the processing job
return Div(
Card(
H4(f"Export Status: Processing ⏳"),
P(f"Using existing export job ID: {job_id}", style="margin-bottom: 0.5rem;"),
P(f"Started: {created_str}", style="margin-bottom: 0.5rem;"),
Progress(value="60", max="100", style="width: 100%; margin-bottom: 1rem;"),
P("The export job is still processing. Please check back later.",
style="color: #666; margin-bottom: 1rem;"),
Form(
Button("Check Status", type="submit", cls="secondary"),
hx_post=f"/{app_name}/check_export_status",
hx_target=f"#{step_id}",
hx_vals=f'{{"pipeline_id": "{pipeline_id}", "job_url": "{job_url}", "job_id": "{job_id}"}}'
)
),
pip.create_step_navigation(step_id, step_index, steps, app_name, job_url),
id=step_id
)
# No existing jobs - start a new one
try:
# Prepare fields to include
include_fields = {
'title': include_title,
'meta_desc': include_meta_desc,
'h1': include_h1
}
# Read API token
api_token = self.read_api_token()
# Create download directory
download_dir = await self.create_download_directory(org, project, analysis)
# Initiate the export job
job_url, error = await self.initiate_export_job(
org, project, analysis, depth, include_fields, api_token
)
if error:
return P(f"Error initiating export: {error}", style=pip.get_style("error"))
# Extract job ID for display
job_id = job_url.split("/")[-1]
# Register the export job in the registry
self.register_export_job(org, project, analysis, depth, job_url, job_id)
# Do a quick poll to see if the job completed very quickly
is_complete, download_url, poll_error = await self.poll_job_status(job_url, api_token)
# Update the registry if the job completed right away
if is_complete and download_url:
self.update_export_job(
org, project, analysis, depth, job_id,
{'status': 'DONE', 'download_url': download_url}
)
# Store job info in state
if step_id not in state:
state[step_id] = {}
state[step_id].update({
'job_url': job_url,
'job_id': job_id,
'org': org,
'project': project,
'analysis': analysis,
'depth': depth,
'download_url': download_url if is_complete else None,
'status': 'DONE' if is_complete else 'PROCESSING',
'download_path': str(download_dir),
'include_fields': include_fields
})
# Store the job URL as the "done" field
state[step_id][step.done] = job_url
# Write the complete state back
pip.write_state(pipeline_id, state)
# Prepare status message
if is_complete:
status_msg = f"Export completed! Job ID: {job_id}\nThe export is ready for download."
else:
status_msg = f"Export job started with Job ID: {job_id}\nThe export is processing and may take several minutes."
# Send message
await self.message_queue.add(pip, status_msg, verbatim=True)
# Create response UI
status_display = "Complete ✅" if is_complete else "Processing ⏳"
result_card = Card(
H4(f"Export Status: {status_display}"),
P(f"Job ID: {job_id}", style="margin-bottom: 0.5rem;"),
P(f"Exporting URLs up to depth {depth}", style="margin-bottom: 0.5rem;"),
P(f"Including fields: " +
", ".join([k for k, v in include_fields.items() if v]),
style="margin-bottom: 1rem;")
)
if is_complete:
# If the job completed right away, show download button
download_button = Form(
Button("Download CSV", type="submit", cls="primary"),
hx_post=f"/{app_name}/download_csv",
hx_target=f"#{step_id}",
hx_vals=f'{{"pipeline_id": "{pipeline_id}"}}'
)
return Div(
result_card,
download_button,
pip.create_step_navigation(step_id, step_index, steps, app_name, job_url),
id=step_id
)
else:
# Otherwise show processing message
return Div(
result_card,
P("Please check back in a few minutes for download options.",
style="color: #666; margin-bottom: 1rem;"),
Form(
Button("Check Status", type="submit", cls="secondary"),
hx_post=f"/{app_name}/check_export_status",
hx_target=f"#{step_id}",
hx_vals=f'{{"pipeline_id": "{pipeline_id}", "job_url": "{job_url}", "job_id": "{job_id}"}}'
),
pip.create_step_navigation(step_id, step_index, steps, app_name, job_url),
id=step_id
)
except Exception as e:
logger.error(f"Error in export submission: {str(e)}")
return P(f"An error occurred: {str(e)}", style=pip.get_style("error"))
The changes weren’t fully applied. Let’s try again:
# pipulate/plugins/50_botify_export.py
async def step_04_submit(self, request):
"""Handle the submission of the CSV export options and start the export job"""
pip, db, steps, app_name = self.pipulate, self.db, self.steps, self.app_name
step_id = "step_04"
step_index = self.steps_indices[step_id]
step = steps[step_index]
pipeline_id = db.get("pipeline_id", "unknown")
# Handle finalized state using helper
if step.done == 'finalized':
return await pip.handle_finalized_step(pipeline_id, step_id, steps, app_name, self)
# Get form data
form = await request.form()
include_title = form.get("include_title") == "true"
include_meta_desc = form.get("include_meta_desc") == "true"
include_h1 = form.get("include_h1") == "true"
# Get data from previous steps
state = pip.read_state(pipeline_id)
step_01_data = pip.get_step_data(pipeline_id, "step_01", {})
step_02_data = pip.get_step_data(pipeline_id, "step_02", {})
step_03_data = pip.get_step_data(pipeline_id, "step_03", {})
org = step_01_data.get('org')
project = step_01_data.get('project')
analysis = step_02_data.get('analysis')
depth = step_03_data.get('depth')
# Check if we have all required data
if not all([org, project, analysis, depth]):
return P("Missing required data from previous steps.", style=pip.get_style("error"))
# Check if there's already an export job with these parameters
existing_jobs = self.find_export_jobs(org, project, analysis, depth)
completed_jobs = [job for job in existing_jobs if job['status'] == 'DONE' and job.get('download_url')]
processing_jobs = [job for job in existing_jobs if job['status'] == 'PROCESSING']
# Check if any processing jobs have completed
if processing_jobs:
api_token = self.read_api_token()
for job in processing_jobs[:]: # Use a copy for iteration
job_url = job['job_url']
job_id = job['job_id']
try:
is_complete, download_url, _ = await self.poll_job_status(job_url, api_token)
if is_complete and download_url:
# Update the job status in the registry
self.update_export_job(
org, project, analysis, depth, job_id,
{'status': 'DONE', 'download_url': download_url}
)
# Move from processing to completed
job_with_url = {**job, 'status': 'DONE', 'download_url': download_url}
completed_jobs.append(job_with_url)
processing_jobs.remove(job)
except Exception as e:
logger.error(f"Error checking job status: {e}")
# If there's a completed job, offer to download it instead of starting a new one
if completed_jobs:
job = completed_jobs[0]
job_id = job['job_id']
download_url = job['download_url']
# Update the state with the existing job information
if step_id not in state:
state[step_id] = {}
state[step_id].update({
'job_url': job['job_url'],
'job_id': job_id,
'org': org,
'project': project,
'analysis': analysis,
'depth': depth,
'download_url': download_url,
'status': 'DONE',
'include_fields': {
'title': include_title,
'meta_desc': include_meta_desc,
'h1': include_h1
}
})
# Store the job URL as the "done" field
state[step_id][step.done] = job['job_url']
# Write the state
pip.write_state(pipeline_id, state)
# Send message about reusing the existing export
await self.message_queue.add(
pip,
f"Using existing export job (ID: {job_id}).\n"
f"This job has already completed and is ready for download.",
verbatim=True
)
# Create response UI for the existing job
return Div(
Card(
H4(f"Export Status: Complete ✅"),
P(f"Using existing export job ID: {job_id}", style="margin-bottom: 0.5rem;"),
P(f"The export with your requested parameters is ready for download.",
style="margin-bottom: 1rem;"),
Form(
Button("Download CSV", type="submit", cls="primary"),
hx_post=f"/{app_name}/download_csv",
hx_target=f"#{step_id}",
hx_vals=f'{{"pipeline_id": "{pipeline_id}"}}'
)
),
pip.create_step_navigation(step_id, step_index, steps, app_name, job['job_url']),
id=step_id
)
# If there's a processing job, show its status instead of starting a new one
if processing_jobs:
job = processing_jobs[0]
job_id = job['job_id']
job_url = job['job_url']
# Update the state with the existing job information
if step_id not in state:
state[step_id] = {}
state[step_id].update({
'job_url': job_url,
'job_id': job_id,
'org': org,
'project': project,
'analysis': analysis,
'depth': depth,
'status': 'PROCESSING',
'include_fields': {
'title': include_title,
'meta_desc': include_meta_desc,
'h1': include_h1
}
})
# Store the job URL as the "done" field
state[step_id][step.done] = job_url
# Write the state
pip.write_state(pipeline_id, state)
# Send message about the existing processing job
created = job.get('created', 'Unknown')
try:
created_dt = datetime.fromisoformat(created)
created_str = created_dt.strftime("%Y-%m-%d %H:%M:%S")
except:
created_str = created
await self.message_queue.add(
pip,
f"Using existing export job (ID: {job_id}).\n"
f"This job is still processing (started: {created_str}).",
verbatim=True
)
# Create response UI for the processing job
return Div(
Card(
H4(f"Export Status: Processing ⏳"),
P(f"Using existing export job ID: {job_id}", style="margin-bottom: 0.5rem;"),
P(f"Started: {created_str}", style="margin-bottom: 0.5rem;"),
Progress(value="60", max="100", style="width: 100%; margin-bottom: 1rem;"),
P("The export job is still processing. Please check back later.",
style="color: #666; margin-bottom: 1rem;"),
Form(
Button("Check Status", type="submit", cls="secondary"),
hx_post=f"/{app_name}/check_export_status",
hx_target=f"#{step_id}",
hx_vals=f'{{"pipeline_id": "{pipeline_id}", "job_url": "{job_url}", "job_id": "{job_id}"}}'
)
),
pip.create_step_navigation(step_id, step_index, steps, app_name, job_url),
id=step_id
)
# No existing jobs - start a new one
try:
# Prepare fields to include
include_fields = {
'title': include_title,
'meta_desc': include_meta_desc,
'h1': include_h1
}
# Read API token
api_token = self.read_api_token()
# Create download directory
download_dir = await self.create_download_directory(org, project, analysis)
# Initiate the export job
job_url, error = await self.initiate_export_job(
org, project, analysis, depth, include_fields, api_token
)
if error:
return P(f"Error initiating export: {error}", style=pip.get_style("error"))
# Extract job ID for display
job_id = job_url.split("/")[-1]
# Register the export job in the registry
self.register_export_job(org, project, analysis, depth, job_url, job_id)
# Do a quick poll to see if the job completed very quickly
is_complete, download_url, poll_error = await self.poll_job_status(job_url, api_token)
# Update the registry if the job completed right away
if is_complete and download_url:
self.update_export_job(
org, project, analysis, depth, job_id,
{'status': 'DONE', 'download_url': download_url}
)
# Store job info in state
if step_id not in state:
state[step_id] = {}
state[step_id].update({
'job_url': job_url,
'job_id': job_id,
'org': org,
'project': project,
'analysis': analysis,
'depth': depth,
'download_url': download_url if is_complete else None,
'status': 'DONE' if is_complete else 'PROCESSING',
'download_path': str(download_dir),
'include_fields': include_fields
})
# Store the job URL as the "done" field
state[step_id][step.done] = job_url
# Write the complete state back
pip.write_state(pipeline_id, state)
# Prepare status message
if is_complete:
status_msg = f"Export completed! Job ID: {job_id}\nThe export is ready for download."
else:
status_msg = f"Export job started with Job ID: {job_id}\nThe export is processing and may take several minutes."
# Send message
await self.message_queue.add(pip, status_msg, verbatim=True)
# Create response UI
status_display = "Complete ✅" if is_complete else "Processing ⏳"
result_card = Card(
H4(f"Export Status: {status_display}"),
P(f"Job ID: {job_id}", style="margin-bottom: 0.5rem;"),
P(f"Exporting URLs up to depth {depth}", style="margin-bottom: 0.5rem;"),
P(f"Including fields: " +
", ".join([k for k, v in include_fields.items() if v]),
style="margin-bottom: 1rem;")
)
if is_complete:
# If the job completed right away, show download button
download_button = Form(
Button("Download CSV", type="submit", cls="primary"),
hx_post=f"/{app_name}/download_csv",
hx_target=f"#{step_id}",
hx_vals=f'{{"pipeline_id": "{pipeline_id}"}}'
)
return Div(
result_card,
download_button,
pip.create_step_navigation(step_id, step_index, steps, app_name, job_url),
id=step_id
)
else:
# Otherwise show processing message
return Div(
result_card,
P("Please check back in a few minutes for download options.",
style="color: #666; margin-bottom: 1rem;"),
Form(
Button("Check Status", type="submit", cls="secondary"),
hx_post=f"/{app_name}/check_export_status",
hx_target=f"#{step_id}",
hx_vals=f'{{"pipeline_id": "{pipeline_id}", "job_url": "{job_url}", "job_id": "{job_id}"}}'
),
pip.create_step_navigation(step_id, step_index, steps, app_name, job_url),
id=step_id
)
except Exception as e:
logger.error(f"Error in export submission: {str(e)}")
return P(f"An error occurred: {str(e)}", style=pip.get_style("error"))
Now, let’s add the check_export_status and download_ready_export methods:
# pipulate/plugins/50_botify_export.py
async def check_export_status(self, request):
"""
Check the status of an export job and update the UI
"""
pip, db, steps, app_name = self.pipulate, self.db, self.steps, self.app_name
step_id = "step_04"
step_index = self.steps_indices[step_id]
step = steps[step_index]
next_step_id = steps[step_index + 1].id if step_index < len(steps) - 1 else None
# Get form data
form = await request.form()
pipeline_id = form.get("pipeline_id")
job_url = form.get("job_url")
job_id = form.get("job_id")
if not all([pipeline_id, job_url, job_id]):
return P("Missing required parameters", style=pip.get_style("error"))
# Get state data
state = pip.read_state(pipeline_id)
step_data = pip.get_step_data(pipeline_id, step_id, {})
# Check if the job is complete
try:
api_token = self.read_api_token()
is_complete, download_url, error = await self.poll_job_status(job_url, api_token)
if is_complete and download_url:
# Update the state with the download URL
state[step_id]['download_url'] = download_url
state[step_id]['status'] = 'DONE'
pip.write_state(pipeline_id, state)
# Update the registry
org = step_data.get('org')
project = step_data.get('project')
analysis = step_data.get('analysis')
depth = step_data.get('depth')
if all([org, project, analysis, depth]):
self.update_export_job(
org, project, analysis, depth, job_id,
{'status': 'DONE', 'download_url': download_url}
)
# Send success message
await self.message_queue.add(
pip,
f"Export job completed! Job ID: {job_id}\n"
f"The export is ready for download.",
verbatim=True
)
# Return the download button
return Div(
Card(
H4("Export Status: Complete ✅"),
P(f"Job ID: {job_id}", style="margin-bottom: 0.5rem;"),
P(f"The export is ready for download.", style="margin-bottom: 1rem;"),
Form(
Button("Download CSV", type="submit", cls="primary"),
hx_post=f"/{app_name}/download_csv",
hx_target=f"#{step_id}",
hx_vals=f'{{"pipeline_id": "{pipeline_id}"}}'
)
),
pip.create_step_navigation(step_id, step_index, steps, app_name, job_url),
id=step_id
)
else:
# Job is still processing
created = step_data.get('created', state[step_id].get('created', 'Unknown'))
try:
created_dt = datetime.fromisoformat(created)
created_str = created_dt.strftime("%Y-%m-%d %H:%M:%S")
except:
created_str = created
# Send message about still processing
await self.message_queue.add(
pip,
f"Export job is still processing (Job ID: {job_id}).\n"
f"Please check back later.",
verbatim=True
)
return Div(
Card(
H4("Export Status: Processing ⏳"),
P(f"Job ID: {job_id}", style="margin-bottom: 0.5rem;"),
P(f"Started: {created_str}", style="margin-bottom: 0.5rem;"),
Progress(value="60", max="100", style="width: 100%; margin-bottom: 1rem;"),
P("The export job is still processing. Please check back later.",
style="color: #666; margin-bottom: 1rem;"),
Form(
Button("Check Status Again", type="submit", cls="secondary"),
hx_post=f"/{app_name}/check_export_status",
hx_target=f"#{step_id}",
hx_vals=f'{{"pipeline_id": "{pipeline_id}", "job_url": "{job_url}", "job_id": "{job_id}"}}'
)
),
pip.create_step_navigation(step_id, step_index, steps, app_name, job_url),
id=step_id
)
except Exception as e:
logger.error(f"Error checking job status: {str(e)}")
return P(f"Error checking export status: {str(e)}", style=pip.get_style("error"))
async def download_ready_export(self, request):
"""
Handle downloading an existing export job
"""
pip, db, steps, app_name = self.pipulate, self.db, self.steps, self.app_name
step_id = "step_04"
step_index = self.steps_indices[step_id]
step = steps[step_index]
# Get form data
form = await request.form()
pipeline_id = form.get("pipeline_id")
job_id = form.get("job_id")
download_url = form.get("download_url")
if not all([pipeline_id, job_id, download_url]):
return P("Missing required parameters", style=pip.get_style("error"))
# Get state data
state = pip.read_state(pipeline_id)
step_data = pip.get_step_data(pipeline_id, step_id, {})
# Extract necessary information from state
org = step_data.get('org')
project = step_data.get('project')
analysis = step_data.get('analysis')
depth = step_data.get('depth', '0')
# Update state with the download URL if it's not already set
if step_id not in state:
state[step_id] = {}
state[step_id].update({
'job_id': job_id,
'download_url': download_url,
'status': 'DONE',
'org': org,
'project': project,
'analysis': analysis,
'depth': depth
})
# Set a dummy job URL if none exists
if step.done not in state[step_id]:
state[step_id][step.done] = f"existing://{job_id}"
pip.write_state(pipeline_id, state)
# Show download progress UI
try:
# Create download directory
download_dir = await self.create_download_directory(org, project, analysis)
# Generate filename
include_fields = step_data.get('include_fields', {})
fields_suffix = "_".join(k for k, v in include_fields.items() if v) or "url_only"
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{org}_{project}_{analysis}_depth_{depth}_{fields_suffix}_{timestamp}.csv"
local_file_path = download_dir / filename
# Start downloading with progress feedback
await self.message_queue.add(pip, f"Starting download from {download_url}", verbatim=True)
# Show download progress
return Div(
Card(
H4("Downloading CSV File"),
P(f"Downloading export to {local_file_path}", style="margin-bottom: 1rem;"),
Progress(value="10", max="100", style="width: 100%;"),
P("Please wait, this may take a few minutes for large files...",
style="font-size: 0.9em; color: #666;")
),
hx_get=f"/{app_name}/download_progress",
hx_trigger="load",
hx_target=f"#{step_id}",
hx_vals=f'{{"pipeline_id": "{pipeline_id}", "download_url": "{download_url}", "local_file": "{local_file_path}"}}',
id=step_id
)
except Exception as e:
logger.error(f"Error preparing download: {str(e)}")
return P(f"Error preparing download: {str(e)}", style=pip.get_style("error"))
Now let’s add the routes for these new methods:
# pipulate/plugins/50_botify_export.py
# Add routes for CSV export functionality
app.route(f"/{app_name}/download_csv", methods=["POST"])(self.download_csv)
app.route(f"/{app_name}/download_progress")(self.download_progress)
app.route(f"/{app_name}/use_existing_export", methods=["POST"])(self.use_existing_export)
app.route(f"/{app_name}/step_04/new")(self.step_04_new)
app.route(f"/{app_name}/check_export_status", methods=["POST"])(self.check_export_status)
app.route(f"/{app_name}/download_ready_export", methods=["POST"])(self.download_ready_export)
Let’s update the download methods to update the registry:
# pipulate/plugins/50_botify_export.py
async def download_progress(self, request):
"""
Handle the actual file download and show progress
"""
pip, db, steps, app_name = self.pipulate, self.db, self.steps, self.app_name
step_id = "step_04"
step_index = self.steps_indices[step_id]
step = steps[step_index]
next_step_id = steps[step_index + 1].id if step_index < len(steps) - 1 else None
# Get request data
pipeline_id = request.query_params.get("pipeline_id")
download_url = request.query_params.get("download_url")
local_file = request.query_params.get("local_file")
if not all([pipeline_id, download_url, local_file]):
return P("Missing required parameters", style=pip.get_style("error"))
local_file_path = Path(local_file)
try:
# Perform the actual download
api_token = self.read_api_token()
headers = {"Authorization": f"Token {api_token}"}
# Create parent directories if they don't exist
local_file_path.parent.mkdir(parents=True, exist_ok=True)
# Download the file using httpx with streaming
async with httpx.AsyncClient() as client:
async with client.stream("GET", download_url, headers=headers, follow_redirects=True) as response:
response.raise_for_status()
# Get content length if available
total_size = int(response.headers.get('content-length', 0))
# Open file for writing
with open(local_file_path, 'wb') as f:
async for chunk in response.aiter_bytes(chunk_size=8192):
f.write(chunk)
# Check if file needs to be unzipped
if local_file_path.suffix.lower() in ('.zip', '.gz'):
await self.message_queue.add(pip, f"Extracting {local_file_path}", verbatim=True)
extracted_path = None
# Unzip the file (implementation depends on file type)
if local_file_path.suffix.lower() == '.zip':
import zipfile
with zipfile.ZipFile(local_file_path, 'r') as zip_ref:
# Get the first CSV file in the archive
csv_files = [f for f in zip_ref.namelist() if f.lower().endswith('.csv')]
if not csv_files:
return P("No CSV files found in the downloaded ZIP archive", style=pip.get_style("error"))
# Extract the CSV file
csv_file = csv_files[0]
extracted_path = local_file_path.parent / csv_file
zip_ref.extract(csv_file, local_file_path.parent)
elif local_file_path.suffix.lower() == '.gz':
import gzip
import shutil
# Assume it's a .csv.gz file and create a .csv file
extracted_path = local_file_path.with_suffix('')
with gzip.open(local_file_path, 'rb') as f_in:
with open(extracted_path, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
# Update the local file path to the extracted file if available
if extracted_path:
local_file_path = extracted_path
# Update state with local file path
state = pip.read_state(pipeline_id)
step_data = pip.get_step_data(pipeline_id, step_id, {})
job_id = step_data.get('job_id')
org = step_data.get('org')
project = step_data.get('project')
analysis = step_data.get('analysis')
depth = step_data.get('depth')
state[step_id]['local_file'] = str(local_file_path)
pip.write_state(pipeline_id, state)
# Update the registry if we have a job_id
if job_id and all([org, project, analysis, depth]):
self.update_export_job(
org, project, analysis, depth, job_id,
{'local_file': str(local_file_path)}
)
# Get file information
file_size_bytes = local_file_path.stat().st_size
file_size_mb = file_size_bytes / (1024 * 1024)
# Send success message
await self.message_queue.add(
pip,
f"Successfully downloaded and prepared CSV file:\n"
f"Path: {local_file_path}\n"
f"Size: {file_size_mb:.2f} MB",
verbatim=True
)
# Return success UI with file information
return Div(
pip.revert_control(
step_id=step_id,
app_name=app_name,
message=f"{step.show}: CSV downloaded ({file_size_mb:.2f} MB)",
steps=steps
),
Div(id=next_step_id, hx_get=f"/{app_name}/{next_step_id}", hx_trigger="load"),
id=step_id
)
except Exception as e:
return Div(
Card(
H4("Download Error"),
P(f"Error downloading CSV file: {str(e)}", style=pip.get_style("error")),
P(f"Download URL: {download_url}"),
P(f"Target file: {local_file_path}"),
Button("Try Again", type="button", cls="primary",
hx_post=f"/{app_name}/download_csv",
hx_target=f"#{step_id}",
hx_vals=f'{{"pipeline_id": "{pipeline_id}"}}')
),
id=step_id
)
Now let’s create the downloads directory for the registry file:
cd ~/repos/pipulate && mkdir -p downloads
Let’s restart the server to apply all our changes:
cd ~/repos/pipulate && pkill -f "python server.py" || true && nix develop .#quiet --command python server.py
I’ve implemented comprehensive improvements to make the Botify export workflow more idempotent and robust:
- Central Export Registry
- Created a JSON-based registry that tracks all export jobs across workflow instances
- Stores key metadata like job URLs, status, and download locations
- Prevents duplicate exports for the same parameters
- Intelligent Job Status Management
- Automatically detects existing export jobs for the same parameters
- Enables resuming in-progress exports instead of starting new ones
- Checks job status and updates UI appropriately
- Enhanced User Experience
- Displays appropriate UI based on job status (processing, ready for download, downloaded)
- Provides helpful status messages and progress indicators
- Offers options to reuse existing downloads or create new exports
- Improved Error Handling
- Better error recovery for network issues or API failures
- File extraction for compressed downloads (.zip, .gz)
- Verification of downloaded files
These changes make the workflow much more resilient - now when you request an export that has already been started or completed, the system will:
- Detect the existing job first
- For completed jobs: Offer to download the ready file
- For in-progress jobs: Show status and polling options
- For downloaded files: Reuse them without initiating new exports
The most expensive part (the API export request) now only happens when truly necessary, and all progress is tracked centrally to avoid redundant operations.
Agent Mode Over Engineering Allowed!
There was a frigin 1.2K lines of code added right there. Clearly Claude found a housekeeping registry pattern using JSON files that it liked. I figured the AI was so on a roll I’d let see where it went, and clicking around, it seems basically competent. Very surprising! Next steps: seek clarity on what it did!
Wowsers! I’ve never seen anything like that. At first I thought I would stop you
based on over-engineering a solution. But then I figured I’d see it through. I’m
glad I did. I don’t have all the nuance down yet, but I have the basic idea of
the registry. I guess my question is that of general philosophy here regarding
the polling downloads. I don’t seem to see them. After Start Export
, I see the
Check Status
button. Is polling occurring there in the background? I don’t see
any progress spinners or gauges. If you just sat on that screen would Check
Status
automatically switch to Download CSV
? Maybe you could explain it in a
sort of instruction manual way? I think all the mechanics are there, but there
may be some user interface ambiguity to purge, niceties to add and wow factor
stuff to impress the user with that remains. But don’t dive into implementation
of anything new yet. Make me understand it and we can plan out next steps.
Claude:
Botify Export Workflow: User Guide
Current Functionality Explained
You’re right that the polling mechanism isn’t as obvious or automatic as it could be. Here’s how it currently works:
Step 1: Initiating the Export
When you press “Start Export”, the system:
- Submits the export job to Botify API
- Registers the job in the central registry
- Does a single immediate poll to see if it completed instantly (rare but possible)
- Shows either “Complete ✅” or “Processing ⏳” status
Step 2: During Processing
- No automatic background polling happens at this stage
- The “Check Status” button is your manual way to poll the Botify API
- Each time you click it, the system checks if the job is complete
- The UI doesn’t update automatically - it waits for your interaction
Step 3: When Complete
- Only after clicking “Check Status” and getting a “complete” response, the UI will change
- The “Check Status” button gets replaced with a “Download CSV” button
- This transition isn’t automatic - it requires your action
What’s Missing and Could Be Improved
- Automatic Background Polling:
- The system could periodically check status without user clicks
- A WebSocket or SSE connection could push updates to the UI
- Visual Progress Indicators:
- The progress bar is static (always at 60%) - it doesn’t reflect actual progress
- No real-time status updates or estimated time remaining
- Transition Feedback:
- No clear indication when a job transitions from “processing” to “ready”
- No notifications or alerts when exports complete
How to Use It Currently
- Start your export with desired parameters
- Wait a few minutes (large exports can take 5-20+ minutes depending on site size)
- Click “Check Status” periodically to see if it’s complete
- Once complete, the “Download CSV” button will appear
- Click to download the file to your local system
- The file will be saved in the organized directory structure
Central Registry Benefits
Even with these UI limitations, the registry system provides major benefits:
- Prevents duplicate job submissions for the same parameters
- Records all export jobs across workflow instances
- Remembers where files were saved locally
- Allows you to resume interrupted workflows without losing track of jobs
Would you like me to plan some specific UI/UX improvements to make the process more intuitive and user-friendly? I have several ideas for enhancing the user experience while keeping the solid foundation we’ve built.
An Attempt At Better User Feedback On Download
Me: Wow, I am rather stunned and pleased. This may be my first truly positive Agent-mode experience yet. You can see from the overarching system how I find over-engineering reprehensible. And that’s what I thought you were doing. But this is an ideal situation for a lightweight registry separate from everything else, and slipped right there into the WET Workflow. Wow! Okay, I think I approve this.
Now, this is a HTMX environment and the type of visual feedback polling that you propose in “What’s Missing and Could Be Improved” is spot-on and exactly the sort of thing that should be achievable. There should absolutely not be a “Check Status” and “Download CSV” button. Those should both be assumed. And if surfing a way is a problem, then when surfing back and coming into the correct place in the workflow (re-entering a key or whatever), even the polling should pick up where it left off. And if the download is available (and hasn’t already been downloaded), it should immediately download.
So behavior-wise, after a Start Export
click from Step 4, there’s only the
progress indicator and then CSV downloaded
status as far as the user is
concerned. We’re actually simplifying the UI/UX.
Don’t worry about the details of the progress indicator yet. You can use the PicoCSS indeterminate gauge from https://picocss.com/docs/progress for now.
But you have so much in excellent state. Don’t lose it! Don’t regress. Lock in everything that’s working well. Use the current state as a scaffolding. Only very carefully and as least intrusively as you can layer or blend in the better polling experience.
Claude: [All attempts flop. Apparently as tired as I am. I will make a fresh start tomorrow.]
Gemini 2.5’s Thoughts On Today’s Ending State
Okay, I’ve reviewed the Python code you provided for the Botify CSV Export workflow (plugins/50_botify_export.py
, assuming the filename based on conventions).
This code represents a much more complete implementation than the initial request in the article, covering not just the URL parsing (Step 1) but also:
- Fetching analysis slugs via API (Step 2)
- Calculating max safe click depth via API (Step 3)
- Initiating, polling, and downloading export jobs via API (Step 4)
- Managing an export job registry (
downloads/export_registry.json
) - Handling existing downloaded files and jobs in progress.
Here’s a breakdown of the implementation against the patterns and requirements:
Alignment with Previous Plan & Codebase Patterns:
- Core Structure: The code correctly follows the Pipulate workflow pattern established by
20_hello_workflow.py
. It defines a class (BotifyExport
), sets constants (APP_NAME
,DISPLAY_NAME
, etc.), definesStep
tuples, and implementslanding
,init
,step_XX
,step_XX_submit
,finalize
,unfinalize
, andhandle_revert
methods. - Step 1 (URL Parsing):
- The
step_01
GET handler correctly displays the input form. - The
step_01_submit
handler correctly uses the externalparse_botify_url
function, validates the input, handlesValueError
, stores the canonical URL (parsed_data['url']
) as thestep.done
field, and stores the full parsed dictionary (parsed_data
) includingorg
andproject
within thestep_01
state key. This successfully implements the requirement to store multiple pieces of data while marking the step complete based on the canonical URL. - It correctly uses the
pipulate
helpers for state management and navigation generation.
- The
- Step 2 (Analysis Selection):
- Introduces API interaction (
Workspace_analyses
) which requires an API token (read_api_token
). - Correctly populates a dropdown (
Select
) with available analysis slugs. - Handles potential errors during API fetch and displays them.
- The submit handler saves the chosen analysis slug.
- Introduces API interaction (
- Step 3 (Max Depth Calculation):
- Introduces more complex API interaction (
get_urls_by_depth
viacalculate_max_safe_depth
). - Displays the calculated depth and explanation.
- Uses a hidden input field in the form to pass the calculated depth to the submit handler, which is a clean way to handle non-user-inputted data progression.
- Introduces more complex API interaction (
- Step 4 (CSV Export):
- This step is significantly more complex than the others, handling multiple states: checking for existing jobs/files, initiating new jobs, polling status, and triggering downloads.
- It correctly uses the export registry functions (
load/save/register/update/find
) to manage job state persistently across different workflow runs. - It introduces several new routes and handlers (
download_csv
,download_progress
, etc.) specific to this step’s functionality, which is appropriate. - The download logic correctly handles
.zip
and.gz
files.
- Helper Functions: The added helper functions (
read_api_token
,Workspace_analyses
, API query/export functions, registry functions, directory creation) encapsulate the specific logic for Botify interaction and file management well. - WET Principle: The code maintains the WET approach for workflows – the logic for each step, including API calls and specific UI generation, is largely contained within that step’s methods in this file.
Potential Issues & Suggestions:
- API Token Management (
read_api_token
):- Security/Flexibility: Reading from a hardcoded filename (
botify_token.txt
) in the root directory is simple but potentially insecure and inflexible. Consider alternatives like environment variables, storing per profile (maybe encrypted), or a dedicated config section in the UI. - Error Handling: If
botify_token.txt
is missing or unreadable, theFileNotFoundError
or other exceptions are caught, but the raisedValueError
might still cause the step to fail without clear UI feedback. It would be better to catch this in the step handler (e.g.,step_02
) and return aP(...)
tag explaining that the token is missing/invalid.
- Security/Flexibility: Reading from a hardcoded filename (
- API Error Handling (UI): When functions like
Workspace_analyses
,initiate_export_job
,poll_job_status
encounter HTTP errors or other issues, they often raise exceptions or return error strings/None
. The step handlers (step_02
,step_03
,step_04
, etc.) should explicitly check for these error conditions and return user-friendly error messages within the step’s UI usingP(error_message, style=pip.get_style("error"))
. - Download Progress Feedback (
download_progress
): Displaying a static progress bar (Progress(value="10", ...)
) while downloading isn’t very informative for large files. Implementing real-time progress updates is complex with the current setup. A simpler improvement: Replace the initial progress bar with a message like “Downloading… please wait” and only update the UI card after the download (and potential unzipping) is fully complete or fails, showing the final status (e.g., “Download complete: file.csv (XX MB)” or “Download failed: error”). - Export Registry Robustness: The JSON file (
export_registry.json
) is simple but could potentially become corrupted or face race conditions in edge cases (though less likely in this single-user context). It serves its purpose well for now. - Polling Mechanism (
poll_job_status
): The current implementation polls only once when the status is checked (either explicitly via “Check Status” button or implicitly when rendering Step 4). For long-running jobs, the user has to manually click “Check Status” repeatedly. A more advanced solution could involve background polling or using HTMX’s polling mechanism (hx-trigger="every 10s"
) on the status display element, but this adds complexity. The current manual check is functional. - Unzipping Assumptions: The unzip logic makes reasonable assumptions (first CSV in zip,
.gz
contains.csv
) but could be made more robust if needed (e.g., handling multiple CSVs in a zip, checking file magic numbers).
Conclusion:
This is a well-developed implementation of the Botify CSV export workflow that significantly expands upon the initial requirements outlined in the article. It successfully integrates API interactions and file handling into the Pipulate workflow pattern. The code structure is logical, and it leverages the existing framework components appropriately.
The primary areas for potential future improvement revolve around enhancing user feedback for API errors and the download process, and potentially making API token management more flexible and secure. However, as it stands, this code provides a strong foundation for the desired functionality.
AI Analysis
Title/Headline Ideas:
- Building Pipulate: Integrating Botify’s CSV Export API Step-by-Step
- From Zero to Botify CSV Download: A Pipulate Workflow Development Log
- Navigating Botify’s API for Large Exports: BQL, Limits, and Polling in Python
- AI-Assisted Development: Comparing ChatGPT, Gemini, Grok, and Claude on a Botify Integration Task
- Pipulate & Botify: A Developer’s Journey Through API Integration and Workflow Design
Strengths:
- Detailed Process Documentation: Captures the nuances, challenges, and decision-making process of software development in real-time.
- Practical Code Examples: Provides concrete Python snippets for interacting with the Botify API (parsing URLs, BQL queries, polling, downloading).
- AI Interaction Showcase: Offers a unique comparison of how different large language models approach the same coding problem, including their varying levels of detail and correctness.
- Real-World Problem Solving: Addresses genuine challenges faced when dealing with external APIs, such as rate limits, asynchronous operations, and API fragility.
- Transparency: The “warts and all” approach, including admitting struggles and showing iterative refinement (like the polling mechanism), adds authenticity.
Weaknesses:
- Over-reliance on AI Dialogue: Large sections consist of direct AI outputs, which can make the core technical narrative and final implemented logic harder to follow without careful reading.
- Potential Obsolescence: The specific AI models and their capabilities/limitations mentioned might become outdated relatively quickly.
- Complexity Creep: The introduction of the “export registry” pattern, while potentially useful, feels like a significant architectural addition introduced late in the process and might contradict earlier stated goals of simplicity within the WET workflow philosophy (at least as presented solely within this text).
- Lack of Polling Clarity (Initially): The development of the polling mechanism shows confusion and iteration, which, while realistic, makes the final, intended user experience unclear until the very end discussion with Claude.
AI Opinion:
This article offers significant value as a detailed, practical log of developing a specific, non-trivial software feature involving external API integration (Botify) and heavy AI assistance. Its strength lies in documenting the iterative process, the specific technical hurdles encountered (like BQL intricacies, API limits, state management in a custom framework), and comparing AI assistant outputs. While its raw, journal-style format makes it dense and potentially requires prior context about Pipulate or Botify for full comprehension, it serves as an excellent case study for developers working on similar integrations or those interested in the practical application and comparison of different AI coding tools. The later sections detailing the Botify API modes and the export registry add considerable technical depth.