Skip to content

agent_k.agents.lobbyist

The LOBBYIST discovery agent module.

agent_k.agents.lobbyist

Lobbyist agent - competition discovery for AGENT-K.

@notice: | Lobbyist agent - competition discovery for AGENT-K.

@dev: | See module for implementation details and extension points.

@graph: id: agent_k.agents.lobbyist provides: - agent_k.agents.lobbyist:LobbyistAgent - agent_k.agents.lobbyist:LobbyistDeps - agent_k.agents.lobbyist:LobbyistSettings - agent_k.agents.lobbyist:DiscoveryResult - agent_k.agents.lobbyist:lobbyist_agent consumes: - agent_k.core.protocols:PlatformAdapter - agent_k.ui.agui:EventEmitter - agent_k.toolsets.kaggle:kaggle_toolset pattern: agent-singleton

@similar: - id: agent_k.agents.scientist when: "Use for research/analysis, not discovery."

@agent-guidance: do: - "Use agent_k.agents.lobbyist as the canonical home for this capability." do_not: - "Create parallel modules without updating @similar or @graph."

@human-review: last-verified: 2026-01-26 owners: - agent-k-core

(c) Mike Casale 2025. Licensed under the MIT License.

LobbyistSettings

Bases: BaseSettings

Configuration for the Lobbyist agent.

@notice: | Configuration for the Lobbyist agent.

@dev: | See module for implementation details and extension points.

@pattern:
    name: settings
    rationale: "Centralizes discovery agent configuration."
    violations: "Ad-hoc per-run overrides lead to inconsistent behavior."
Source code in agent_k/agents/lobbyist.py
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
class LobbyistSettings(BaseSettings):
    """Configuration for the Lobbyist agent.

    @notice: |
        Configuration for the Lobbyist agent.

    @dev: |
        See module for implementation details and extension points.

        @pattern:
            name: settings
            rationale: "Centralizes discovery agent configuration."
            violations: "Ad-hoc per-run overrides lead to inconsistent behavior."
    """

    model_config = SettingsConfigDict(env_prefix="LOBBYIST_", env_file=".env", extra="ignore", validate_default=True)
    model: str = Field(default=DEFAULT_MODEL, description="Model identifier for discovery tasks")
    temperature: float = Field(default=0.2, ge=0.0, le=2.0, description="Sampling temperature for discovery prompts")
    max_tokens: int = Field(default=2048, ge=1, description="Maximum tokens for responses")
    tool_retries: int = Field(default=2, ge=0, description="Tool retry attempts")
    output_retries: int = Field(default=1, ge=0, description="Output validation retry attempts")
    max_results: int = Field(default=50, ge=1, description="Maximum competitions to return")

    @property
    def model_settings(self) -> ModelSettings:
        """Build ModelSettings for the configured model."""
        return ModelSettings(temperature=self.temperature, max_tokens=self.max_tokens)
model_settings property
model_settings: ModelSettings

Build ModelSettings for the configured model.

LobbyistDeps dataclass

Dependencies for the Lobbyist agent.

@notice: | Dependencies for the Lobbyist agent.

@dev: | See module for implementation details and extension points.

@pattern:
    name: dependency-container
    rationale: "Groups runtime services for discovery tools."
    violations: "Hidden globals make tests and tooling brittle."

@collaborators:
    required:
        - httpx:AsyncClient
        - agent_k.core.protocols:PlatformAdapter
        - agent_k.ui.agui:EventEmitter
    optional:
        - agent_k.toolsets.memory:AgentKMemoryTool
    injection: constructor
    lifecycle: "Allocated per agent run."
Source code in agent_k/agents/lobbyist.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
@dataclass
class LobbyistDeps:
    """Dependencies for the Lobbyist agent.

    @notice: |
        Dependencies for the Lobbyist agent.

    @dev: |
        See module for implementation details and extension points.

        @pattern:
            name: dependency-container
            rationale: "Groups runtime services for discovery tools."
            violations: "Hidden globals make tests and tooling brittle."

        @collaborators:
            required:
                - httpx:AsyncClient
                - agent_k.core.protocols:PlatformAdapter
                - agent_k.ui.agui:EventEmitter
            optional:
                - agent_k.toolsets.memory:AgentKMemoryTool
            injection: constructor
            lifecycle: "Allocated per agent run."
    """

    http_client: httpx.AsyncClient
    platform_adapter: PlatformAdapter
    event_emitter: EventEmitter
    search_cache: dict[str, Any] = field(default_factory=dict)

DiscoveryResult

Bases: BaseModel

Result of competition discovery.

@notice: | Result of competition discovery.

@dev: | See module for implementation details and extension points.

@pattern:
    name: output-model
    rationale: "Stable schema for discovery outputs."
    violations: "Ad-hoc dict outputs are hard to validate."
Source code in agent_k/agents/lobbyist.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
class DiscoveryResult(BaseModel):
    """Result of competition discovery.

    @notice: |
        Result of competition discovery.

    @dev: |
        See module for implementation details and extension points.

        @pattern:
            name: output-model
            rationale: "Stable schema for discovery outputs."
            violations: "Ad-hoc dict outputs are hard to validate."
    """

    model_config = ConfigDict(frozen=True, str_strip_whitespace=True, validate_default=True)
    schema_version: str = Field(default=SCHEMA_VERSION, description="Schema version")
    competitions: list[Competition] = Field(
        default_factory=list, description="Discovered competitions matching criteria"
    )
    total_searched: int = Field(default=0, ge=0, description="Total competitions scanned")
    filters_applied: list[str] = Field(default_factory=list, description="Filters applied during discovery")

LobbyistAgent

Bases: MemoryMixin

Lobbyist agent encapsulating competition discovery functionality.

@notice: | Discovers Kaggle competitions matching mission criteria. Use the module-level lobbyist_agent or agent registry.

@dev: | Registers discovery tools on initialization and wires memory/toolsets.

@pattern: name: agent-singleton rationale: "Single instance ensures consistent memory/tool registration." violations: "Multiple instances cause duplicated tool registrations."

@collaborators: required: - agent_k.core.protocols:PlatformAdapter - agent_k.ui.agui:EventEmitter optional: - httpx:AsyncClient injection: deps via RunContext lifecycle: "Module-level singleton at import time."

@concurrency: model: asyncio safe: false reason: "Mutates internal caches and tool registrations."

@invariants: - "self.agent is initialized after __init_ completes." - "self._toolset registers discovery tools exactly once."

Source code in agent_k/agents/lobbyist.py
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
class LobbyistAgent(MemoryMixin):
    """Lobbyist agent encapsulating competition discovery functionality.

    @notice: |
        Discovers Kaggle competitions matching mission criteria.
        Use the module-level lobbyist_agent or agent registry.

    @dev: |
        Registers discovery tools on initialization and wires memory/toolsets.

    @pattern:
        name: agent-singleton
        rationale: "Single instance ensures consistent memory/tool registration."
        violations: "Multiple instances cause duplicated tool registrations."

    @collaborators:
        required:
            - agent_k.core.protocols:PlatformAdapter
            - agent_k.ui.agui:EventEmitter
        optional:
            - httpx:AsyncClient
        injection: deps via RunContext
        lifecycle: "Module-level singleton at import time."

    @concurrency:
        model: asyncio
        safe: false
        reason: "Mutates internal caches and tool registrations."

    @invariants:
        - "self._agent is initialized after __init__ completes."
        - "self._toolset registers discovery tools exactly once."
    """

    def __init__(self, settings: Annotated[LobbyistSettings | None, Doc("Optional settings override.")] = None) -> None:
        """Initialize the Lobbyist agent.

        @notice: |
            Builds the agent singleton and registers tools.

        @dev: |
            Initializes memory backend, toolset, and pydantic-ai Agent.

        @state-changes:
            - self._settings
            - self._toolset
            - self._agent
        """
        self._settings = settings or LobbyistSettings()
        self._toolset: FunctionToolset[LobbyistDeps] = FunctionToolset(id="lobbyist")
        self._memory_backend = self._init_memory_backend()
        self._register_tools()
        self._agent = self._create_agent()
        register_agent("lobbyist", self._agent)
        self._setup_memory()

    @property
    def agent(self) -> Agent[LobbyistDeps, DiscoveryResult]:
        """Return the underlying pydantic-ai Agent."""
        return self._agent

    @property
    def settings(self) -> LobbyistSettings:
        """Return current settings."""
        return self._settings

    async def search_kaggle_competitions(
        self,
        ctx: RunContext[LobbyistDeps],
        categories: Annotated[list[str], Doc("Competition categories to search.")],
        keywords: Annotated[list[str] | None, Doc("Optional keyword filters.")] = None,
        min_prize: Annotated[int | None, Doc("Minimum prize pool in USD."), Range(0, 1_000_000_000)] = None,
    ) -> list[dict[str, Any]]:
        """Search Kaggle for competitions matching criteria.

        @notice: |
            Queries the platform adapter for competition listings.

        @dev: |
            Emits telemetry events before and after the search.

        @effects:
            io:
                - Kaggle API requests
            state:
                - ctx.deps.search_cache
        """
        with logfire.span("lobbyist.search_kaggle", categories=categories, keywords=keywords):
            await ctx.deps.event_emitter.emit_tool_start(
                task_id="discovery_search",
                tool_call_id=f"kaggle_search_{id(ctx)}",
                tool_type="kaggle_mcp",
                operation="competitions.list",
            )

            adapter = ctx.deps.platform_adapter
            competitions: list[dict[str, Any]] = []

            async for comp in adapter.search_competitions(
                categories=categories, keywords=keywords, min_prize=min_prize, active_only=True
            ):
                competitions.append(comp.model_dump())
                ctx.deps.search_cache[comp.id] = comp

            await ctx.deps.event_emitter.emit_tool_result(
                task_id="discovery_search",
                tool_call_id=f"kaggle_search_{id(ctx)}",
                result={"count": len(competitions)},
                duration_ms=0,
            )

            return competitions

    async def get_competition_details(
        self, ctx: RunContext[LobbyistDeps], competition_id: Annotated[str, Doc("Competition identifier (slug).")]
    ) -> dict[str, Any]:
        """Get detailed information about a specific competition.

        @notice: |
            Fetches the competition details via the platform adapter.

        @effects:
            io:
                - Kaggle API request
        """
        with logfire.span("lobbyist.get_details", competition_id=competition_id):
            adapter = ctx.deps.platform_adapter
            competition = await adapter.get_competition(competition_id)
            return competition.model_dump()

    async def score_competition_fit(
        self,
        ctx: RunContext[LobbyistDeps],
        competition_id: Annotated[str, Doc("Competition identifier to score.")],
        target_domains: Annotated[list[str], Doc("Target subject domains for scoring.")],
        min_days_remaining: Annotated[int, Doc("Minimum days remaining required."), Range(0, 3650)],
        target_percentile: Annotated[float, Doc("Target percentile for fit scoring."), Range(0.0, 100.0)],
    ) -> dict[str, Any]:
        """Score how well a competition fits the mission criteria.

        @notice: |
            Computes a heuristic fit score from cached competition metadata.

        @dev: |
            Returns a structured dict with score and reasoning.

        @effects:
            state:
                - none
        """
        competition = ctx.deps.search_cache.get(competition_id)
        if not competition:
            return {"score": 0.0, "reason": "Competition not in cache"}

        score = 0.0
        reasons: list[str] = []
        if any(domain.lower() in " ".join(competition.tags).lower() for domain in target_domains):
            score += 0.4
            reasons.append("matches_domain")

        days_remaining = competition.days_remaining
        if days_remaining >= min_days_remaining:
            score += 0.3
            reasons.append("sufficient_time")

        if competition.prize_pool and competition.prize_pool >= 10000:
            score += 0.2
            reasons.append("good_prize")

        score += min(0.1, target_percentile / 100.0)
        reasons.append("target_percentile")
        return {
            "competition_id": competition_id,
            "score": round(score, 2),
            "reasons": reasons,
            "days_remaining": days_remaining,
        }

    def _create_agent(self) -> Agent[LobbyistDeps, DiscoveryResult]:
        """Create the underlying pydantic-ai agent.

        @factory-for:
            id: agent_k.agents.lobbyist:LobbyistAgent
            rationale: "Centralizes agent wiring and toolset preparation."
            singleton: true
            cache-key: "module"

        @canonical-home:
            for:
                - "lobbyist agent construction"
            notes: "Use LobbyistAgent() or module singleton."
        """
        builtin_tools: list[Any] = [prepare_web_search]
        if self._memory_backend is not None:
            builtin_tools.append(prepare_memory_tool)

        agent: Agent[LobbyistDeps, DiscoveryResult] = Agent(
            model=get_model(self._settings.model),
            deps_type=LobbyistDeps,
            output_type=DiscoveryResult,
            instructions=LOBBYIST_SYSTEM_PROMPT,
            name="lobbyist",
            model_settings=self._settings.model_settings,
            retries=self._settings.tool_retries,
            output_retries=self._settings.output_retries,
            builtin_tools=builtin_tools,
            toolsets=[
                create_production_toolset([self._toolset, cast("FunctionToolset[LobbyistDeps]", kaggle_toolset)])
            ],
            prepare_tools=universal_tool_preparation,
            instrument=True,
        )

        agent.output_validator(self._validate_discovery_result)
        agent.instructions(self._add_search_context)
        return agent

    def _register_tools(self) -> None:
        """Register all discovery tools with the toolset.

        @effects:
            state:
                - self._toolset
        """
        self._toolset.tool(self.search_kaggle_competitions)
        self._toolset.tool(self.get_competition_details)
        self._toolset.tool(self.score_competition_fit)

    async def _validate_discovery_result(
        self, ctx: RunContext[LobbyistDeps], result: DiscoveryResult
    ) -> DiscoveryResult:
        """Validate discovery results meet minimum requirements."""
        if ctx.partial_output:
            return result
        if not result.competitions:
            raise ModelRetry("No competitions found. Broaden criteria and try again.")
        return result

    async def _add_search_context(self, ctx: RunContext[LobbyistDeps]) -> str:
        """Add cached search results to context."""
        return f"Previously found competitions: {list(ctx.deps.search_cache.keys())}" if ctx.deps.search_cache else ""
__init__
__init__(settings: Annotated[LobbyistSettings | None, Doc('Optional settings override.')] = None) -> None

Initialize the Lobbyist agent.

@notice: | Builds the agent singleton and registers tools.

@dev: | Initializes memory backend, toolset, and pydantic-ai Agent.

@state-changes: - self._settings - self._toolset - self._agent

Source code in agent_k/agents/lobbyist.py
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
def __init__(self, settings: Annotated[LobbyistSettings | None, Doc("Optional settings override.")] = None) -> None:
    """Initialize the Lobbyist agent.

    @notice: |
        Builds the agent singleton and registers tools.

    @dev: |
        Initializes memory backend, toolset, and pydantic-ai Agent.

    @state-changes:
        - self._settings
        - self._toolset
        - self._agent
    """
    self._settings = settings or LobbyistSettings()
    self._toolset: FunctionToolset[LobbyistDeps] = FunctionToolset(id="lobbyist")
    self._memory_backend = self._init_memory_backend()
    self._register_tools()
    self._agent = self._create_agent()
    register_agent("lobbyist", self._agent)
    self._setup_memory()
agent property

Return the underlying pydantic-ai Agent.

settings property
settings: LobbyistSettings

Return current settings.

search_kaggle_competitions async
search_kaggle_competitions(ctx: RunContext[LobbyistDeps], categories: Annotated[list[str], Doc('Competition categories to search.')], keywords: Annotated[list[str] | None, Doc('Optional keyword filters.')] = None, min_prize: Annotated[int | None, Doc('Minimum prize pool in USD.'), Range(0, 1000000000)] = None) -> list[dict[str, Any]]

Search Kaggle for competitions matching criteria.

@notice: | Queries the platform adapter for competition listings.

@dev: | Emits telemetry events before and after the search.

@effects: io: - Kaggle API requests state: - ctx.deps.search_cache

Source code in agent_k/agents/lobbyist.py
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
async def search_kaggle_competitions(
    self,
    ctx: RunContext[LobbyistDeps],
    categories: Annotated[list[str], Doc("Competition categories to search.")],
    keywords: Annotated[list[str] | None, Doc("Optional keyword filters.")] = None,
    min_prize: Annotated[int | None, Doc("Minimum prize pool in USD."), Range(0, 1_000_000_000)] = None,
) -> list[dict[str, Any]]:
    """Search Kaggle for competitions matching criteria.

    @notice: |
        Queries the platform adapter for competition listings.

    @dev: |
        Emits telemetry events before and after the search.

    @effects:
        io:
            - Kaggle API requests
        state:
            - ctx.deps.search_cache
    """
    with logfire.span("lobbyist.search_kaggle", categories=categories, keywords=keywords):
        await ctx.deps.event_emitter.emit_tool_start(
            task_id="discovery_search",
            tool_call_id=f"kaggle_search_{id(ctx)}",
            tool_type="kaggle_mcp",
            operation="competitions.list",
        )

        adapter = ctx.deps.platform_adapter
        competitions: list[dict[str, Any]] = []

        async for comp in adapter.search_competitions(
            categories=categories, keywords=keywords, min_prize=min_prize, active_only=True
        ):
            competitions.append(comp.model_dump())
            ctx.deps.search_cache[comp.id] = comp

        await ctx.deps.event_emitter.emit_tool_result(
            task_id="discovery_search",
            tool_call_id=f"kaggle_search_{id(ctx)}",
            result={"count": len(competitions)},
            duration_ms=0,
        )

        return competitions
get_competition_details async
get_competition_details(ctx: RunContext[LobbyistDeps], competition_id: Annotated[str, Doc('Competition identifier (slug).')]) -> dict[str, Any]

Get detailed information about a specific competition.

@notice: | Fetches the competition details via the platform adapter.

@effects: io: - Kaggle API request

Source code in agent_k/agents/lobbyist.py
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
async def get_competition_details(
    self, ctx: RunContext[LobbyistDeps], competition_id: Annotated[str, Doc("Competition identifier (slug).")]
) -> dict[str, Any]:
    """Get detailed information about a specific competition.

    @notice: |
        Fetches the competition details via the platform adapter.

    @effects:
        io:
            - Kaggle API request
    """
    with logfire.span("lobbyist.get_details", competition_id=competition_id):
        adapter = ctx.deps.platform_adapter
        competition = await adapter.get_competition(competition_id)
        return competition.model_dump()
score_competition_fit async
score_competition_fit(ctx: RunContext[LobbyistDeps], competition_id: Annotated[str, Doc('Competition identifier to score.')], target_domains: Annotated[list[str], Doc('Target subject domains for scoring.')], min_days_remaining: Annotated[int, Doc('Minimum days remaining required.'), Range(0, 3650)], target_percentile: Annotated[float, Doc('Target percentile for fit scoring.'), Range(0.0, 100.0)]) -> dict[str, Any]

Score how well a competition fits the mission criteria.

@notice: | Computes a heuristic fit score from cached competition metadata.

@dev: | Returns a structured dict with score and reasoning.

@effects: state: - none

Source code in agent_k/agents/lobbyist.py
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
async def score_competition_fit(
    self,
    ctx: RunContext[LobbyistDeps],
    competition_id: Annotated[str, Doc("Competition identifier to score.")],
    target_domains: Annotated[list[str], Doc("Target subject domains for scoring.")],
    min_days_remaining: Annotated[int, Doc("Minimum days remaining required."), Range(0, 3650)],
    target_percentile: Annotated[float, Doc("Target percentile for fit scoring."), Range(0.0, 100.0)],
) -> dict[str, Any]:
    """Score how well a competition fits the mission criteria.

    @notice: |
        Computes a heuristic fit score from cached competition metadata.

    @dev: |
        Returns a structured dict with score and reasoning.

    @effects:
        state:
            - none
    """
    competition = ctx.deps.search_cache.get(competition_id)
    if not competition:
        return {"score": 0.0, "reason": "Competition not in cache"}

    score = 0.0
    reasons: list[str] = []
    if any(domain.lower() in " ".join(competition.tags).lower() for domain in target_domains):
        score += 0.4
        reasons.append("matches_domain")

    days_remaining = competition.days_remaining
    if days_remaining >= min_days_remaining:
        score += 0.3
        reasons.append("sufficient_time")

    if competition.prize_pool and competition.prize_pool >= 10000:
        score += 0.2
        reasons.append("good_prize")

    score += min(0.1, target_percentile / 100.0)
    reasons.append("target_percentile")
    return {
        "competition_id": competition_id,
        "score": round(score, 2),
        "reasons": reasons,
        "days_remaining": days_remaining,
    }