Execute Ansible playbooks triggered via MQTT messages with real-time status reporting.
pip install .Or for development:
pip install -e ".[dev]"Copy config.example.yaml to config.yaml and adjust the settings:
mqtt:
host: "mqtt.example.com"
port: 1883
username: "ansible-worker"
password: "${MQTT_PASSWORD}" # Environment variable expansion supported
keepalive: 60
tls_enabled: false
worker:
group_name: "production"
playbook_directory: "/opt/ansible/playbooks"
topic_prefix: "ansible"
max_queue_size: 100
task_timeout: 3600
log_level: "INFO"# Run with default config.yaml
ansible-worker
# Run with custom config file
ansible-worker -c /path/to/config.yaml
# Validate configuration
ansible-worker --validate
# Run as a module
python -m ansible_worker -c config.yamlThe topic prefix is configurable via worker.topic_prefix (default: ansible).
The worker subscribes to receive task requests using MQTT 5.0 shared subscriptions:
- Topic:
$share/ansible-worker-<group>/<prefix>/<group>/tasks(QoS 2) - Example:
$share/ansible-worker-production/ansible/production/tasks
The $share/ prefix enables distributed task handling - the broker delivers each message to only ONE worker in the group, providing automatic load balancing.
Status updates are published to:
- Topic:
<prefix>/<group>/tasks/<task_id>/status(QoS 1, retained) - Example:
ansible/production/tasks/a1b2c3d4e5f67890/status
When running multiple workers in the same group, the MQTT broker automatically distributes tasks among them using shared subscriptions (MQTT 5.0 feature).
- All workers in a group subscribe to
$share/ansible-worker-<group>/<topic> - When a task message is published, the broker delivers it to exactly ONE worker
- The first available (idle) worker receives and processes the task
- Other workers don't see the message at all - no duplicate work
- No application-level locking - the broker handles distribution
- Automatic load balancing - idle workers get tasks first
- No duplicate execution - each task is delivered to exactly one worker
- Simple scaling - just start more workers to increase capacity
- No external dependencies - uses built-in MQTT 5.0 feature
Your MQTT broker must support MQTT 5.0 shared subscriptions:
Publish to <prefix>/<group>/tasks:
{
"task_id": "a1b2c3d4e5f67890",
"playbook": "deploy/application.yml",
"inventory": "production",
"extra_vars": {"app_version": "2.5.0"},
"limit": "webservers",
"tags": ["deploy"],
"skip_tags": [],
"verbosity": 0,
"check_mode": false,
"diff_mode": false,
"forks": 5,
"timeout": 1800,
"git_pull": false
}Required fields: task_id, playbook, inventory
| Field | Type | Default | Description |
|---|---|---|---|
task_id |
string | required | Unique identifier from the controller |
playbook |
string | required | Path to playbook relative to playbook directory |
inventory |
string | required | Inventory file or host pattern |
extra_vars |
object | {} |
Extra variables to pass to the playbook |
limit |
string | null |
Limit to specific hosts |
tags |
array | [] |
Only run tasks with these tags |
skip_tags |
array | [] |
Skip tasks with these tags |
verbosity |
int | 0 |
Verbosity level (0-4) |
check_mode |
bool | false |
Run in check mode (dry run) |
diff_mode |
bool | false |
Show diffs for changed files |
forks |
int | 5 |
Number of parallel processes |
timeout |
int | null |
Task timeout in seconds |
git_pull |
bool | false |
Run git pull in playbook directory before executing |
Published to <prefix>/<group>/tasks/<task_id>/status:
{
"task_id": "a1b2c3d4e5f67890",
"state": "running",
"created_at": "2026-01-21T10:30:00.000Z",
"started_at": "2026-01-21T10:30:05.000Z",
"completed_at": null,
"duration_seconds": 45.5,
"tasks_total": 25,
"tasks_ok": 20,
"tasks_changed": 3,
"tasks_failed": 0,
"tasks_skipped": 2,
"tasks_unreachable": 0,
"return_code": null,
"error_message": null,
"output": {}
}States: queued, running, success, failed, cancelled, timeout
Playbooks can push custom data back to the controller using Ansible's built-in set_stats module. Any data set via set_stats will appear in the output field of the task status.
Example playbook usage:
- name: create vm
community.general.proxmox_kvm:
name: my-vm
# ...
register: vm_result
- name: report vm details to controller
ansible.builtin.set_stats:
data:
vm_ip: "{{ vm_result.ip }}"
vm_id: "{{ vm_result.vmid }}"The resulting status message will include:
{
"task_id": "a1b2c3d4e5f67890",
"state": "success",
"output": {
"vm_ip": "10.0.0.50",
"vm_id": 105
}
}Multiple set_stats calls within a playbook are merged into a single output dict. Later calls overwrite earlier values for the same key.
| Error | Action |
|---|---|
| MQTT disconnect | Auto-reconnect with exponential backoff |
| Invalid JSON/missing fields | Log error, discard message |
| git pull failed | Mark task failed, publish status |
| Playbook not found | Mark task failed, publish status |
| Execution error | Mark task failed with error message |
| Queue full | Reject with failed status |
| Timeout | Mark task timeout |
# Install dev dependencies
pip install -e ".[dev]"
# Run tests
pytest
# Type checking
mypy ansible_worker
# Linting
ruff check ansible_workerMIT