# Asynchronous Workflow runs

AWF also implements a messaging infrastructure to allow other 3rd party applications to send and receive workflow runs asynchronously using AWS SQS and SNS. A schematic representation architecture is shown below Architecture

The procedure to make use of the asynchronous system is outlined below

# โšก๏ธ Submitting workflows

Applications can submit workflows by submitting a json message to the AWFTaskQueue. In order to allow your application to submit messages to the AWFTaskQueue users must:

  1. Provide the AWF development team (opens new window) with AWS ARN of the AWS principal (opens new window) you wish to connect to the queue. Current supported principals are:
    • AWS accounts (e.g. arn:aws:iam::555555555555:root)
    • AWS users (e.g.arn:aws:iam::AWS-account-ID:user/UserName)
    • AWS roles (e.g. arn:aws:iam::AWS-account-ID:role/role-name)
  2. Request the AWFSendMessageRole ARN and AWFTaskQueue url from the the AWF development team (opens new window)
  3. Allow your application to assume the AWFSendMessageRole IAM Role. This can be done using the AWS SDK, CLI etc. depending on where and how the application is run. For more info see here (opens new window)

# โœ‰๏ธ Message structure

In order to submit a workflow via the queue a strict message schema is enforced, Here is the schema 3rd party applications should use for submitting to the AWF queue:

{
    "user_name": "string",
    "job_number": "string",
    "app_id": "string",
    "AWF_workflow_slug": "tutorial",
    "AWF_workflow_arguments": [
        {
            "name": "object_id",  
            "type": "string",
            "value": "my test workflow run"
        },
        {
            "name": "file",
            "type": "file",
            "value": "input.txt",
            "properties":{
                "url":"https://azure.com?signed_url",
                "mime_type":"text/plain"
            }
        }
    ]
}

# Example

Here is an sample python code that submits a workflow using the AWF messaging system

import boto3, json

sts = boto3.client('sts')
role = sts.assume_role(
    RoleArn="arn:aws:iam::AWS-account-ID:role/role-name",
    RoleSessionName="my-awesome-role-session"
)

sqs = boto3.client(
    'sqs',
    aws_access_key_id=role['Credentials']['AccessKeyId'],
    aws_secret_access_key=role['Credentials']['SecretAccessKey'],
    aws_session_token=role['Credentials']['SessionToken']
)

message = {
    "user_name": "john.doe@arup.com",
    "job_number": "00000000",
    "app_id": "my_awesome_app",
    "AWF_workflow_slug": "tutorial",
    "AWF_workflow_arguments": [
        {
            "name": "object_id",  
            "type": "string",
            "value": "my test workflow run"
        },
        {
            "name": "file",
            "type": "file",
            "value": "input.txt",
            "properties":{
                "url":"https://azure.com?signed_url",
                "mime_type":"text/plain"
            }
        }
    ]
}

response = sqs.send_message(
    QueueUrl="AwfTaskQueueUrl"
    MessageBody=json.dumps(message)
)

# ๐Ÿ“ฌ Receiving workflow notifications

Applications can also receive notifications whenever there is a status update for a workflow run. In order to receive these notifications the application must subscribe to the AWFRunStatusUpdateTopic

In order to allow your application to receive notifications from the AWFRunStatusUpdateTopic users must:

  1. Provide the AWF development team (opens new window) with AWS ARN of the AWS principal (opens new window) you wish to connect to the topic. Current supported principals are:
    • AWS accounts (e.g. arn:aws:iam::555555555555:root)
    • AWS users (e.g.arn:aws:iam::AWS-account-ID:user/UserName)
    • AWS roles (e.g. arn:aws:iam::AWS-account-ID:role/role-name)
  2. Request the AWFRunStatusUpdateTopic ARN from the the AWF development team (opens new window)
  3. Subscribe your application to the AWFRunStatusUpdateTopic. For more info see here (opens new window)

# ๐Ÿ“จ Message structure

Here is the schema 3rd party apps should expect for messages coming from the AWF notifications:

{
    "url": "http://testserver/api/run/1/",
    "id": "1",
    "archived": false,
    "owner": "owner",
    "status_display": "Workflow Completed With Errors",
    "arguments": null,
    "resources": [
        {
            "signed_url":"https://aws.s3",
            "url": "http://testserver/api/resource/1/",
            "name": "name",
            "type_display": "Input",
            "date_created": "2021-02-23T20:12:50.982994Z",
            "key": "key"
        }
    ],
    "job_number": null,
    "object": null,
    "status": 3,
    "date_created": "2021-02-23T20:12:50.964969Z",
    "date_updated": "2021-02-23T20:12:50.965996Z",
    "stdout": "test log",
    "stderr": "test error log",
     "metadata": {
        "app_id": "string",
        "tda_msg_key": "string",
        "tda_msg_body": {},
    },
    "workflow": "http://testserver/api/workflow/slug/",
    "worker": "http://testserver/api/worker/8131f141-5f97-49e4-af2d-d074324fe9c8/"
}

# ๐Ÿ” Message structure

Subscribers to AWFRunStatusUpdateTopic can also make use of the notification attributes to filter for only relevant notifications. Subscribers can filter based on the following properties:

  • app_id, a unique identifier for 3rd party app. This id is usually provided when submitting workflow request asynchronously see Submitting workflows
  • workflow_slug, the unique workflow slug
  • status, the AWF workflow status:
    • Workflow Created
    • Workflow Submitted
    • Workflow Assigned
    • Workflow Started
    • Workflow Completed Successfully
    • Workflow Completed With Errors
    • Workflow Stop Requested
    • Workflow Canceled
    • Workflow Terminated With Exception

example filter policy:

{
    "workflow_slug": [
        "tutorial"
    ],
    "app_id": [
        "my_awesome_app"
    ]
}

see here (opens new window) for more details about filter policies