The Trickle Pattern¶
Iterative screening: add new sequences across rounds without re-predicting cached ones.
⚠️ Preview Feature — The
biolmai.pipelinemodule used in this guide is currently in preview and not yet publicly released. Access is available to early users on request. Contact us to get access.
What you'll learn:
- Persisting predictions across multiple pipeline runs with a named
DuckDBDataStore - The anti-join cache lookup that skips already-predicted sequences
- Changing filter thresholds with zero API calls
- Querying the full campaign history
Requirements:
pip install biolmai[pipeline]
export BIOLMAI_TOKEN=your-token-here
Setup¶
import os
from biolmai.pipeline import (
DataPipeline, DuckDBDataStore,
ThresholdFilter, RankingFilter,
ValidAminoAcidFilter, EmbeddingSpec,
DiversitySamplingFilter,
)
TOKEN = os.environ.get("BIOLMAI_TOKEN", "")
if not TOKEN:
raise EnvironmentError(
"Set BIOLMAI_TOKEN before running.\n"
"Get one at https://biolm.ai/ui/accounts/user-api-tokens/"
)Round 1: initial screen¶
500 sequences, two models. All predictions cached to campaign.duckdb.
BATCH_1 = [
"GIGKFLHSAKKFGKAFVGEIMNS", "GLFDIIKKIAESF", "KLAKLAKKLAKLAK",
"RRWWRRWWRR", "KWKLFKKI", "GIGKFLHSAK", "RLFDKIRQ",
"GLFDIVKKVVGALGSL", "FLPLILRKIVTAL", "KWKWKWKWKW",
] # In practice: 500 sequences
pipeline_v1 = DataPipeline(
sequences=BATCH_1,
datastore=DuckDBDataStore("campaign.duckdb"),
run_id="screen_v1",
verbose=True,
)
pipeline_v1.add_prediction("temperature-regression", extractions="prediction",
columns="melting_temperature")
pipeline_v1.add_prediction("biolmsol", extractions="solubility_score",
columns="solubility")
pipeline_v1.add_filter(ThresholdFilter("melting_temperature", min_value=40.0))
pipeline_v1.run()
print(f"Round 1: {len(BATCH_1)} sequences predicted and cached")Round 2: new sequences arrive¶
Feed the full combined list — old + new — into the same DB. Only new sequences hit the API.
BATCH_2 = [
"GIKKFLGSIWKFIKAFVKEIMN", "RRLCRIVVIRVCR", "RRWQWR",
"RWRWRW", "FKRIVQRIKDFL", "KWKLFKKIPKFLHLAK",
] # In practice: 200 new sequences
ALL_SEQUENCES = BATCH_1 + BATCH_2
pipeline_v2 = DataPipeline(
sequences=ALL_SEQUENCES,
datastore=DuckDBDataStore("campaign.duckdb"), # same DB
run_id="screen_v2",
verbose=True,
)
pipeline_v2.add_prediction("temperature-regression", extractions="prediction",
columns="melting_temperature")
pipeline_v2.add_prediction("biolmsol", extractions="solubility_score",
columns="solubility")
pipeline_v2.add_filter(ThresholdFilter("melting_temperature", min_value=40.0))
pipeline_v2.run()
print(f"Round 2: only {len(BATCH_2)} new sequences hit the API — {len(BATCH_1)} served from cache")Round 3: tighten filters, zero API calls¶
Wet-lab results suggest the Tm threshold should be higher. No new sequences — just a stricter filter.
pipeline_v3 = DataPipeline(
sequences=ALL_SEQUENCES,
datastore=DuckDBDataStore("campaign.duckdb"),
run_id="screen_v3",
verbose=True,
)
pipeline_v3.add_prediction("temperature-regression", extractions="prediction",
columns="melting_temperature")
pipeline_v3.add_prediction("biolmsol", extractions="solubility_score",
columns="solubility")
pipeline_v3.add_filter(ThresholdFilter("melting_temperature", min_value=55.0)) # stricter
pipeline_v3.run()
pipeline_v3.summary()Query campaign history¶
The database contains every prediction across all rounds. Fully queryable.
ds = DuckDBDataStore("campaign.duckdb")
ds.conn.execute("""
SELECT s.sequence,
MAX(CASE WHEN p.prediction_type = 'melting_temperature' THEN ROUND(p.value,1) END) AS tm,
MAX(CASE WHEN p.prediction_type = 'solubility' THEN ROUND(p.value,3) END) AS solubility
FROM sequences s
JOIN predictions p ON s.sequence_id = p.sequence_id
GROUP BY s.sequence
ORDER BY tm DESC
""").df()Cleanup¶
ds.close()
import os; os.remove("campaign.duckdb")