Using Temporal with Python
My recent side-project vindex is a semantic video indexing platform. Users upload videos that then run through a multi-step processing pipeline to prepare and index them. Finally, users can search specific locations in the video using natural language.
The processing pipeline involves extracting frames, extracting audio, transcribing audio and computing various embeddings for the index. I’m using temporal to orchestrate this pipeline for fun and profit!
What is Temporal and why use it?
Temporal’s website introduces temporal with “Code smart. Move fast. Break nothing. Eliminate complex error or retry logic, avoid callbacks, and ensure that every workflow you start, completes. Temporal delivers durable execution for your services and applications.”.
Temporal provides programming framework where business processes (“workflows”) are broken down into small, resuable units of code called activities.
Workflows describe the flow of activities in sequence (or in parallel) including conditions and branching. Workflows are deterministic – running the same workflow with the same arguments will produce the same result. This is done by storing all inputs and outputs to and from activities as part of a workflow run.
Activities are side-effectful actions like RPC calls, running external programs, or reading/writing a database. Activities are annotated with retry policies and should have an idempotent implementation.
This provides some nice properties around errors – errors like network errors are transient, temporal can automatically retry activities with such failures. Programmer errors can also be transient; once fixed temporal can retry the affected activity or workflow from it’s last known working state.
Scaling is also handled by temporal; one can register workers as needed with the temporal server which will fairly distribute workflows and activities.
Development
For development I use temporalite to run temporal locally. It’s a go repository that you can run with:
git clone https://github.com/temporalio/temporalite.git
cd temporalite
go build ./cmd/temporalite
./temporalite start --namespace default
The app itself is written in python 3.10 and I manage python dependencies using pipenv
which works well although it’s slow.
Writing an activity
I decided to split all activities into two files:
- the parameters of the activity and the result type
- the implementation of the activity itself
This avoid callers of the activity to transitively pull in any dependencies of the activity implementation. The python implementation checks that imports from a workflow module are side-effect free; however many useful python packages needed in activities perform side-effects (e.g. opencv) and thus avoiding the transitive dependency resolves any issues encountered here.
Example my_activity_params.py
from dataclasses import dataclass
@dataclass
class MyActivityParams:
username: str
media: int
@dataclass
class MyActivityResult:
seconds: int
The temporal python SDK relies on python’s dataclasses
to model parameters and return types of workflows and activities. I highly recommend starting with dataclasses
from day 1 – this simplifies backwards compatibility. Once deployed to production changes to these types should be backwards compatible (only adding optional fields).
Example my_activity.py
from temporalio import activity
import psycopg_pool
from myapp.activities.my_activity_params import MyActivityParams, MyActivityResult
def make_my_activity(pool: psycopg_pool.AsyncConnectionPool):
@activity.defn(name="my-activity")
async def impl(params: MyActivityParams) -> MyActivityResult:
async with pool.connection() as conn:
async with conn.cursor() as cur:
# ... do something w/ the database ...
return MyActivityResult(seconds=5)
return impl
All my activity modules don’t directly export the activity, but instead provide a function to construct an activity. This is a useful pattern to inject any common runtime dependencies like database connection pools or configurations.
Writing a workflow
I structure workflows similar to activities for similar reasons. Other than that, the workflows follow the python documentation pretty closely.
Example my_workflow_params.py
from typing import Optional
from dataclasses import dataclass
@dataclass
class MyWorkflowParams:
workspace: int
username: str
s3_location: Optional[str]
Example my_workflow.py
from datetime import timedelta
from temporalio import workflow
from myapp.activities.my_activity_params import MyActivityParams
from myapp.workflows.my_workflow_params import MyWorkflowParams
@workflow.defn(name="my-workflow")
class MyWorkflow:
@workflow.run
async def run(self, params: MyWorkflowParams) -> None:
# ...
activity_result = await workflow.execute_activity(
"my-activity", MyActivityParams(
username=params.username, media=1), start_to_close_timeout=timedelta(seconds=10))
workflow.logger.info(
f"Took {activity_result.seconds}")
# ...
pass
Putting it all together
Running workflows needs two main components – a worker process and a component that triggers (or inspects) new workflows.
The worker code will register all activities and workflows that it should be responsible for and poll for new tasks (activities or workflows) from the temporal server. These are horizontally scalable – you can run as many worker instances as needed.
The trigger will likely integrate into your existing app (e.g. a webserver using fastapi
). Launching a workflow requires a connection to the temporal server, the workflow’s name, the corresponding parameters and an ID. The ID is useful for idempotency and later introspection of the workflow.
You can also isolate work into different task queues and/or namespaces – we’re using the default
task queue and a default
namespace here for simplicity.
Implementing the worker
import asyncio
from temporalio.worker import Worker
from temporalio.client import Client
import psycopg_pool
from myapp.workflows.my_workflow import MyWorkflow
from myapp.activities.my_activity import make_my_activity
# ...
async def run_worker(stop_event: asyncio.Event, pool: psycopg_pool.AsyncConnectionPool):
client = await Client.connect("127.0.0.1:7233", namespace="default")
print("Worker launching")
worker = Worker(
client,
task_queue="default",
# Add all your workflows here
workflows=[MyWorkflow],
# Add all your activity builders here
activities=[make_my_activity(pool), # ...
],
)
async with worker:
print("Worker running")
await stop_event.wait()
print("Worker done")
async def main():
pool = psycopg_pool.AsyncConnectionPool(conninfo="dbname=example")
stop_worker = asyncio.Event()
await run_worker(stop_worker, pool)
stop_worker.set()
if __name__ == "__main__":
asyncio.run(main())
Launching a workflow from your app
import asyncio
from temporalio.client import Client
from myapp.workflows.my_workflow_params import MyWorkflowParams
async def main():
client = await Client.connect("127.0.0.1:7233", namespace="default")
params = MyWorkflowParams(
workspace=1, username="foo", s3_location=None)
handle = await client.start_workflow(
"my-workflow", params, id="some-instance-id", task_queue="default")
await handle.result()
if __name__ == "__main__":
asyncio.run(main())
Conclusion
So far using temporal has been great; I really like the programming model and the observability that comes with it out of the box. Looking forward to using it in more projects – in particular I’m curious to write more parts of the app with temporal and experiment with using it to power API endpoints end-to-end.