How To Poll for Changes in Acquia DAM
- Last updated
- 1 minute read
Goal
Detect changes to assets in Acquia DAM.
Overview
This article will describe the overarching concepts necessary for polling Acquia DAM using the List Assets by Search Query endpoint. This program will detect asset creations, asset file content updates, and asset information updates.
If you are looking for real-time notifications please see Ronald Shaw's article about the Webhooks feature. Whether you decide to use Webhooks or polling depends on your use case.
The samples that are provided will be written in Python using the Requests library, however these examples can be easily translated into your preferred language and library.
The code examples only aim to demonstrate the overarching concepts necessary for polling the DAM. A production-grade implementation will look vastly different based on the specific needs of the implementation.
Considerations: When dealing with assets that are affected by automated processes, we recommend using upload profiles that will auto-version conflicting assets. Assets that end up in the conflict queue may be missed by polling.
-
Setting up your options
There are a few options that we need to set up prior to reviewing the actual code. While the example shows hard-coded values, these may be filled in via a configuration file
- Obtain an Authentication Token using the Acquia DAM API Documentation
- Determine how often you’d like to poll (in seconds).
- Determine which events to detect
- new_asset - Initial creation of an asset
- new_asset_version - File content updates
- asset_update - Any action that appears in “Asset History”
- Set an initial start time to pick events from. Must be a valid ISO 8601 UTC timestamp
- Set a search query to filter the poll to include only assets that meet a certain criteria
AUTH_TOKEN = 'YOUR TOKEN HERE' POLL_INTERVAL = 300 # 5 minutes EVENT_NAME = 'new_asset' START_FROM = '2021-03-25T00:00:00Z' FILTER_QUERY = 'ADVANCED SEARCH QUERY STRING'
-
Setting up the initial application state
The EVENT_TIMESTAMP_MAP maps from the event you picked to the specific timestamp field in the Acquia DAM API asset response.
The state variable will provide information about the previous poll the next time the polling function is called.
The options variable provides the setup parameters for the polling function. They should be treated as constant values.
EVENT_TIMESTAMP_MAP = { 'new_asset': 'created_date', 'new_asset_version': 'file_upload_date', 'asset_update': 'last_update_date' } # Initial Setup state = { 'offset': 0, 'can_poll_more': False, 'since': datetime.strptime(START_FROM, '%Y-%m-%dT%H:%M:%SZ'), 'latest_job': datetime.strptime(START_FROM, '%Y-%m-%dT%H:%M:%SZ') } OPTIONS = { 'timestamp_field': EVENT_TIMESTAMP_MAP[EVENT_NAME], 'filter': FILTER_QUERY, 'auth': { 'Authorization': f"Bearer {AUTH_TOKEN}" } }
-
Polling Loop
For the purpose of this example, we will set up a simple infinite loop to poll assets at a regular interval. When we find jobs, the application will simply print them to the screen. In a practical application, you’d want to enqueue your jobs into some sort of handler.
The polling function can poll for 100 assets at a time, but there may be times that more than 100 events happen within your polling interval. This loop will check the value of ‘can_poll_more’ within the state variable. If there are more assets to poll, the loop will poll again without waiting for the polling interval.
while True: state, jobs = poll_assets(state, OPTIONS) # Printing for example purposes only. # This is where you'd enqueue your jobs in your job handler for job in jobs: print(f"{job[OPTIONS['timestamp_field']]}: {job['filename']}") # If there are more assets, we want to poll again right away. # If not, then we can wait for one poll interval if not state['can_poll_more']: time.sleep(POLL_INTERVAL)
-
Polling Function
The polling function makes use of the V2 List Assets by Search Query endpoint. Essentially, we want to search for assets that meet our search criteria and sort them in descending order based on the timestamp that corresponds to the events we want to pick up. After searching, we must filter out the jobs that are older than the most recent job from the last polling interval.
The rest of the function handles setting the state for the next polling interval, including taking into account if the polling function should run again immediately to pick up more jobs and the parameters necessary to do that. Finally, the function returns two values: the state of the polling function that should be fed back for the next poll, and an array of asset objects.
def poll_assets(state, options): timestamp_field = options['timestamp_field'] params = { 'query': options['filter'], 'limit': 100, 'offset': state['offset'], 'sort': f"-{timestamp_field}" } response = requests.get( 'https://api.widencollective.com/v2/assets/search', headers=options['auth'], params=params ) response.raise_for_status() # Collect all assets into a list up until the most recent one we ran unfiltered_jobs = response.json()['items'] jobs = [job for job in unfiltered_jobs if datetime.strptime(job[timestamp_field], '%Y-%m-%dT%H:%M:%SZ') > state['since']] # Since we are ordering our search results in descending order # the first item on the first page will have the time value of the most recent job if state['offset'] == 0 and len(jobs) > 0: next_latest_job = datetime.strptime(jobs[0][timestamp_field], '%Y-%m-%dT%H:%M:%SZ') else: next_latest_job = state['latest_job'] # If there is more to poll, then add 100 to the offset but keep the 'since' value # to get the next group of results # Otherwise, set the offset back to 0 and advance the 'since' date # to the date of the most recent job more_to_poll = len(jobs) == 100 if more_to_poll: next_offset = state['offset'] + 100 next_since = state['since'] else: next_offset = 0 next_since = next_latest_job next_state = { 'offset': next_offset, 'can_poll_more': more_to_poll, 'since': next_since, 'latest_job': next_latest_job } return next_state, jobs
Implementation Notes
Deduplication: New changes may come in as you’re polling, which will cause the polling function to pick up the same assets more than once. Before enqueueing assets into your handler, you’ll want to compare the asset to a list of assets that have already run to avoid unnecessarily reprocessing the same assets.
PIM: To detect changes in PIM, you can follow a conceptually similar approach using the V2 List Products endpoint. Note the differences in the timestamps and the requirements for the ‘filter’ and the ‘sort’ parameters.
Full Worked Example
from datetime import datetime
import requests
import time
# Polling Function
def poll_assets(state, options):
timestamp_field = options['timestamp_field']
params = {
'query': options['filter'],
'limit': 100,
'offset': state['offset'],
'sort': f"-{timestamp_field}"
}
response = requests.get(
'https://api.widencollective.com/v2/assets/search',
headers=options['auth'],
params=params
)
response.raise_for_status()
# Collect all assets into a list up until the most recent one we ran
unfiltered_jobs = response.json()['items']
jobs = [job for job in unfiltered_jobs
if datetime.strptime(job[timestamp_field], '%Y-%m-%dT%H:%M:%SZ') > state['since']]
# Since we are ordering our search results in descending order
# the first item on the first page will have the time value of the most recent job
if state['offset'] == 0 and len(jobs) > 0:
next_latest_job = datetime.strptime(jobs[0][timestamp_field], '%Y-%m-%dT%H:%M:%SZ')
else:
next_latest_job = state['latest_job']
# If there is more to poll, then add 100 to the offset but keep the 'since' value
# to get the next group of results
# Otherwise, set the offset back to 0 and advance the 'since' date
# to the date of the most recent job
more_to_poll = len(jobs) == 100
if more_to_poll:
next_offset = state['offset'] + 100
next_since = state['since']
else:
next_offset = 0
next_since = next_latest_job
next_state = {
'offset': next_offset,
'can_poll_more': more_to_poll,
'since': next_since,
'latest_job': next_latest_job
}
return next_state, jobs
# Constants, Environment Variables, Configuration Parameters
AUTH_TOKEN = 'YOUR TOKEN HERE'
POLL_INTERVAL = 300 # 5 minutes
EVENT_NAME = 'new_asset'
START_FROM = '2021-03-25T00:00:00Z'
FILTER_QUERY = 'image_group:{User Group Images}'
EVENT_TIMESTAMP_MAP = {
'new_asset': 'created_date',
'new_asset_version': 'file_upload_date',
'asset_update': 'last_update_date'
}
# Initial Setup
state = {
'offset': 0,
'can_poll_more': False,
'since': datetime.strptime(START_FROM, '%Y-%m-%dT%H:%M:%SZ'),
'latest_job': datetime.strptime(START_FROM, '%Y-%m-%dT%H:%M:%SZ')
}
OPTIONS = {
'timestamp_field': EVENT_TIMESTAMP_MAP[EVENT_NAME],
'filter': FILTER_QUERY,
'auth': {
'Authorization': f"Bearer {AUTH_TOKEN}"
}
}
while True:
state, jobs = poll_assets(state, OPTIONS)
# Printing for example purposes only.
# This is where you'd enqueue your jobs in your job handler
for job in jobs:
print(f"{job[OPTIONS['timestamp_field']]}: {job['filename']}")
# If there are more assets, we want to poll again right away.
# If not, then we can wait for one poll interval
if not state['can_poll_more']:
time.sleep(POLL_INTERVAL)