# Asynchronous Workflow runs
AWF also implements a messaging infrastructure to allow other 3rd party applications to send and receive workflow runs asynchronously using AWS SQS and SNS. A schematic representation architecture is shown below
The procedure to make use of the asynchronous system is outlined below
# โก๏ธ Submitting workflows
Applications can submit workflows by submitting a json
message to the AWFTaskQueue
.
In order to allow your application to submit messages to the AWFTaskQueue
users must:
- Provide the AWF development team (opens new window) with AWS ARN of the AWS principal (opens new window) you wish to connect to the queue. Current supported principals are:
- AWS accounts (e.g.
arn:aws:iam::555555555555:root
) - AWS users (e.g.
arn:aws:iam::AWS-account-ID:user/UserName
) - AWS roles (e.g.
arn:aws:iam::AWS-account-ID:role/role-name
)
- AWS accounts (e.g.
- Request the
AWFSendMessageRole
ARN andAWFTaskQueue
url from the the AWF development team (opens new window) - Allow your application to assume the
AWFSendMessageRole
IAM Role. This can be done using the AWS SDK, CLI etc. depending on where and how the application is run. For more info see here (opens new window)
# โ๏ธ Message structure
In order to submit a workflow via the queue a strict message schema is enforced, Here is the schema 3rd party applications should use for submitting to the AWF queue:
{
"user_name": "string",
"job_number": "string",
"app_id": "string",
"AWF_workflow_slug": "tutorial",
"AWF_workflow_arguments": [
{
"name": "object_id",
"type": "string",
"value": "my test workflow run"
},
{
"name": "file",
"type": "file",
"value": "input.txt",
"properties":{
"url":"https://azure.com?signed_url",
"mime_type":"text/plain"
}
}
]
}
# Example
Here is an sample python code that submits a workflow using the AWF messaging system
import boto3, json
sts = boto3.client('sts')
role = sts.assume_role(
RoleArn="arn:aws:iam::AWS-account-ID:role/role-name",
RoleSessionName="my-awesome-role-session"
)
sqs = boto3.client(
'sqs',
aws_access_key_id=role['Credentials']['AccessKeyId'],
aws_secret_access_key=role['Credentials']['SecretAccessKey'],
aws_session_token=role['Credentials']['SessionToken']
)
message = {
"user_name": "john.doe@arup.com",
"job_number": "00000000",
"app_id": "my_awesome_app",
"AWF_workflow_slug": "tutorial",
"AWF_workflow_arguments": [
{
"name": "object_id",
"type": "string",
"value": "my test workflow run"
},
{
"name": "file",
"type": "file",
"value": "input.txt",
"properties":{
"url":"https://azure.com?signed_url",
"mime_type":"text/plain"
}
}
]
}
response = sqs.send_message(
QueueUrl="AwfTaskQueueUrl"
MessageBody=json.dumps(message)
)
# ๐ฌ Receiving workflow notifications
Applications can also receive notifications whenever there is a status update for a workflow run.
In order to receive these notifications the application must subscribe to the AWFRunStatusUpdateTopic
In order to allow your application to receive notifications from the AWFRunStatusUpdateTopic
users must:
- Provide the AWF development team (opens new window) with AWS ARN of the AWS principal (opens new window) you wish to connect to the topic. Current supported principals are:
- AWS accounts (e.g.
arn:aws:iam::555555555555:root
) - AWS users (e.g.
arn:aws:iam::AWS-account-ID:user/UserName
) - AWS roles (e.g.
arn:aws:iam::AWS-account-ID:role/role-name
)
- AWS accounts (e.g.
- Request the
AWFRunStatusUpdateTopic
ARN from the the AWF development team (opens new window) - Subscribe your application to the
AWFRunStatusUpdateTopic
. For more info see here (opens new window)
# ๐จ Message structure
Here is the schema 3rd party apps should expect for messages coming from the AWF notifications:
{
"url": "http://testserver/api/run/1/",
"id": "1",
"archived": false,
"owner": "owner",
"status_display": "Workflow Completed With Errors",
"arguments": null,
"resources": [
{
"signed_url":"https://aws.s3",
"url": "http://testserver/api/resource/1/",
"name": "name",
"type_display": "Input",
"date_created": "2021-02-23T20:12:50.982994Z",
"key": "key"
}
],
"job_number": null,
"object": null,
"status": 3,
"date_created": "2021-02-23T20:12:50.964969Z",
"date_updated": "2021-02-23T20:12:50.965996Z",
"stdout": "test log",
"stderr": "test error log",
"metadata": {
"app_id": "string",
"tda_msg_key": "string",
"tda_msg_body": {},
},
"workflow": "http://testserver/api/workflow/slug/",
"worker": "http://testserver/api/worker/8131f141-5f97-49e4-af2d-d074324fe9c8/"
}
# ๐ Message structure
Subscribers to AWFRunStatusUpdateTopic
can also make use of the notification attributes to filter for only relevant notifications.
Subscribers can filter based on the following properties:
app_id
, a unique identifier for 3rd party app. This id is usually provided when submitting workflow request asynchronously see Submitting workflowsworkflow_slug
, the unique workflow slugstatus
, the AWF workflow status:Workflow Created
Workflow Submitted
Workflow Assigned
Workflow Started
Workflow Completed Successfully
Workflow Completed With Errors
Workflow Stop Requested
Workflow Canceled
Workflow Terminated With Exception
example filter policy:
{
"workflow_slug": [
"tutorial"
],
"app_id": [
"my_awesome_app"
]
}
see here (opens new window) for more details about filter policies
โ Unit Testing Getting Started โ