import json
from sdcclient._common import _SdcCommon
class ScanningAlertsClientV1(_SdcCommon):
def __init__(self, token="", sdc_url='https://secure.sysdig.com', ssl_verify=True, custom_headers=None):
super(ScanningAlertsClientV1, self).__init__(token, sdc_url, ssl_verify, custom_headers)
self.product = "SDS"
class RepositoryAlertTrigger:
@staticmethod
def new_image_analyzed(alert):
alert["triggers"]["analysis_update"] = True
@staticmethod
def scan_result_change_fail(alert):
alert["triggers"]["policy_eval"] = True
alert["onlyPassFail"] = True
@staticmethod
def scan_result_change_any(alert):
alert["triggers"]["policy_eval"] = True
alert["onlyPassFail"] = False
@staticmethod
def cve_update(alert):
alert["triggers"]["vuln_update"] = True
def add_repository_alert(self, name, registry, repository, tag, description="", triggers=None, notification_channels=None, enabled=True):
'''
Create a new repository alert
Args:
name(str): The name of the alert.
registry(str): Registry to alert (e.g. docker.io)
repository(str): Repository to alert (e.g. sysdig/agent)
tag(str): Tag to alert (e.g. latest)
description(str): The description of the alert.
triggers(list): A list of RepositoryAlertTrigger indicating which triggers should be enabled. (default: [ScanningAlertsClientV1.RuntimeAlertTrigger.new_image_analyzed])
notification_channels(list): A list of notification channel ids.
enabled(bool): Whether this alert should actually be applied. Defaults to true.
Returns:
The created alert.
Examples:
>>> client = ScanningAlertsClientV1(sdc_url=os.getenv("SDC_SECURE_URL", "https://secure.sysdig.com"),
>>> token=os.getenv("SDC_SECURE_TOKEN"))
>>> ok, res = client.add_repository_alert(
>>> name="A name",
>>> registry="docker.io",
>>> repository="sysdig/agent",
>>> tag="latest",
>>> description="A description",
>>> triggers=[ScanningAlertsClientV1.RepositoryAlertTrigger.new_image_analyzed,
>>> ScanningAlertsClientV1.RepositoryAlertTrigger.scan_result_change_fail,
>>> ScanningAlertsClientV1.RepositoryAlertTrigger.cve_update]
>>>)
>>> if not ok:
>>> print(f"error creating alert: {res}")
>>> alert_id = res["alertId"]
'''
if not triggers:
triggers = [ScanningAlertsClientV1.RepositoryAlertTrigger.new_image_analyzed]
alert = {
'name': name,
'description': description,
'type': 'repository',
'triggers': {
"unscanned": False,
"analysis_update": False,
"vuln_update": False,
"policy_eval": False,
"failed": False
},
'repositories': [{
'registry': registry,
'repository': repository,
'tag': tag,
}],
"onlyPassFail": False,
"skipEventSend": False,
'enabled': enabled,
'notificationChannelIds': notification_channels,
}
for trigger in triggers:
trigger(alert)
res = self.http.post(f"{self.url}/api/scanning/v1/alerts", headers=self.hdrs, data=json.dumps(alert), verify=self.ssl_verify)
if not self._checkResponse(res):
return [False, self.lasterr]
return [True, res.json()]
def update_repository_alert(self, id, name=None, registry=None, repository=None, tag=None, description=None, triggers=None, notification_channels=None, enabled=None):
'''
Updates a repository alert. Fields that are not specified, will not be modified.
Args:
id(str): Alert ID.
name(str): The name of the alert.
registry(str): Registry to alert (e.g. docker.io)
repository(str): Repository to alert (e.g. sysdig/agent)
tag(str): Tag to alert (e.g. latest)
description(str): The description of the alert.
triggers(list): A list of RepositoryAlertTrigger indicating which triggers should be enabled. (default: [ScanningAlertsClientV1.RuntimeAlertTrigger.unscanned_image])
notification_channels(list): A list of notification channel ids.
enabled(bool): Whether this alert should actually be applied. Defaults to true.
Returns:
The updated alert.
Examples:
>>> client = ScanningAlertsClientV1(sdc_url=os.getenv("SDC_SECURE_URL", "https://secure.sysdig.com"),
>>> token=os.getenv("SDC_SECURE_TOKEN"))
>>> ok, res = client.update_repository_alert(
>>> id=alert_id,
>>> name="An updated name",
>>> registry="updated_registry",
>>> repository="updated_repository",
>>> tag="v1",
>>> description="An updated description",
>>> triggers=[ScanningAlertsClientV1.RepositoryAlertTrigger.scan_result_change_fail]
>>> )
>>> if not ok:
>>> print(f"error updating alert: {res}")
>>> alert_id = res["alertId"]
'''
ok, alert = self.get_alert(id)
if not ok:
return False, f"unable to retrieve alert by ID {id}: {alert}"
if name is not None:
alert["name"] = name
if description is not None:
alert["description"] = description
if registry is not None:
alert["repositories"][0]["registry"] = registry
if repository is not None:
alert["repositories"][0]["repository"] = repository
if tag is not None:
alert["repositories"][0]["tag"] = tag
if triggers is not None:
alert["triggers"] = {
"unscanned": False,
"analysis_update": False,
"vuln_update": False,
"policy_eval": False,
"failed": False
}
alert["onlyPassFail"] = False
for trigger in triggers:
trigger(alert)
if notification_channels is not None:
alert["notificationChannelIds"] = notification_channels
if enabled is not None:
alert["enabled"] = enabled
res = self.http.put(f"{self.url}/api/scanning/v1/alerts/{id}", headers=self.hdrs, data=json.dumps(alert), verify=self.ssl_verify)
if not self._checkResponse(res):
return [False, self.lasterr]
return [True, res.json()]
class RuntimeAlertTrigger:
@staticmethod
def unscanned_image(alert):
alert["triggers"]["unscanned"] = True
@staticmethod
def scan_result_change_fail(alert):
alert["triggers"]["policy_eval"] = True
alert["onlyPassFail"] = True
@staticmethod
def scan_result_change_any(alert):
alert["triggers"]["policy_eval"] = True
alert["onlyPassFail"] = False
@staticmethod
def cve_update(alert):
alert["triggers"]["vuln_update"] = True
def add_runtime_alert(self, name, description="", scope="", triggers=None, notification_channels=None, enabled=True):
'''
Create a new runtime alert
Args:
name(str): The name of the alert.
description(str): The description of the alert.
scope(str): An AND-composed string of predicates that selects the scope in which the alert will be applied. (like: 'host.domain = "example.com" and container.image != "alpine:latest"')
triggers(list): A list of RuntimeAlertTrigger indicating which triggers should be enabled. (default: [ScanningAlertsClientV1.RuntimeAlertTrigger.unscanned_image])
notification_channels(list): A list of notification channel ids.
enabled(bool): Whether this alert should actually be applied. Defaults to true.
Returns:
The created alert.
Examples:
>>> client = ScanningAlertsClientV1(sdc_url=os.getenv("SDC_SECURE_URL", "https://secure.sysdig.com"),
>>> token=os.getenv("SDC_SECURE_TOKEN"))
>>> ok, res = client.add_runtime_alert(
>>> name="A name",
>>> description="A description",
>>> scope="",
>>> triggers=[ScanningAlertsClientV1.RuntimeAlertTrigger.unscanned_image,
>>> ScanningAlertsClientV1.RuntimeAlertTrigger.scan_result_change_fail,
>>> ScanningAlertsClientV1.RuntimeAlertTrigger.cve_update]
>>>)
>>> if not ok:
>>> print(f"error creating alert: {res}")
>>> alert_id = res["alertId"]
'''
if not triggers:
triggers = [ScanningAlertsClientV1.RuntimeAlertTrigger.unscanned_image]
alert = {
'name': name,
'description': description,
'type': 'runtime',
'triggers': {
"unscanned": False,
"analysis_update": False,
"vuln_update": False,
"policy_eval": False,
"failed": False
},
'scope': scope,
"onlyPassFail": False,
"skipEventSend": False,
'enabled': enabled,
'notificationChannelIds': notification_channels,
}
for trigger in triggers:
trigger(alert)
res = self.http.post(f"{self.url}/api/scanning/v1/alerts", headers=self.hdrs, data=json.dumps(alert), verify=self.ssl_verify)
if not self._checkResponse(res):
return [False, self.lasterr]
return [True, res.json()]
def update_runtime_alert(self, id, name=None, description=None, scope=None, triggers=None, notification_channels=None, enabled=None):
'''
Updates a runtime alert. Fields that are not specified, will not be modified.
Args:
id(str): Alert ID.
name(str): The name of the alert.
description(str): The description of the alert.
scope(str): An AND-composed string of predicates that selects the scope in which the alert will be applied. (like: 'host.domain = "example.com" and container.image != "alpine:latest"')
triggers(list): A list of RuntimeAlertTrigger indicating which triggers should be enabled. (default: [ScanningAlertsClientV1.RuntimeAlertTrigger.unscanned_image])
notification_channels(list): A list of notification channel ids.
enabled(bool): Whether this alert should actually be applied. Defaults to true.
Returns:
The updated alert.
Examples:
>>> client = ScanningAlertsClientV1(sdc_url=os.getenv("SDC_SECURE_URL", "https://secure.sysdig.com"),
>>> token=os.getenv("SDC_SECURE_TOKEN"))
>>> ok, res = client.update_runtime_alert(
>>> id=alert_id,
>>> name="An updated name",
>>> description="An updated description",
>>> scope="agent.id = 'foo'",
>>> triggers=[ScanningAlertsClientV1.RuntimeAlertTrigger.scan_result_change_fail]
>>> )
>>> if not ok:
>>> print(f"error updating alert: {res}")
>>> alert_id = res["alertId"]
'''
ok, alert = self.get_alert(id)
if not ok:
return False, f"unable to retrieve alert by ID {id}: {alert}"
if name is not None:
alert["name"] = name
if description is not None:
alert["description"] = description
if scope is not None:
alert["scope"] = scope
if triggers is not None:
alert["triggers"] = {
"unscanned": False,
"analysis_update": False,
"vuln_update": False,
"policy_eval": False,
"failed": False
}
alert["onlyPassFail"] = False
for trigger in triggers:
trigger(alert)
if notification_channels is not None:
alert["notificationChannelIds"] = notification_channels
if enabled is not None:
alert["enabled"] = enabled
res = self.http.put(f"{self.url}/api/scanning/v1/alerts/{id}", headers=self.hdrs, data=json.dumps(alert), verify=self.ssl_verify)
if not self._checkResponse(res):
return [False, self.lasterr]
return [True, res.json()]
def get_alert(self, alertid):
'''
Retrieve the scanning alert with the given id
Args:
alertid: Unique identifier associated with this alert.
Returns:
A JSON object containing the alert description.
Examples:
>>> client = ScanningAlertsClientV1(sdc_url=os.getenv("SDC_SECURE_URL", "https://secure.sysdig.com"),
>>> token=os.getenv("SDC_SECURE_TOKEN"))
>>> ok, res = client.get_alert(alert_id)
>>> if not ok:
>>> print(f"error retrieving alert {alert_id}: {res}")
>>> alert = res
'''
res = self.http.get(f"{self.url}/api/scanning/v1/alerts/{alertid}", headers=self.hdrs, verify=self.ssl_verify)
if not self._checkResponse(res):
return [False, self.lasterr]
return [True, res.json()]
def list_alerts(self, limit=None, cursor=None):
'''
List the current set of scanning alerts.
Args:
limit(int): Maximum number of alerts in the response.
cursor: An opaque string representing the current position in the list of alerts. It's provided in the 'responseMetadata' of the list_alerts response.
Returns:
A JSON containing the list of alerts in the 'alerts' field, and the current cursor position in the 'responseMetadata' field.
Examples:
>>> client = ScanningAlertsClientV1(sdc_url=os.getenv("SDC_SECURE_URL", "https://secure.sysdig.com"),
>>> token=os.getenv("SDC_SECURE_TOKEN"))
>>> ok, res =client.list_alerts()
>>> if not ok:
>>> print(f"error listing alerts: {res}")
>>> for alert in res["alerts"]:
>>> print(alert["alertId"])
>>>
>>> # Load more alerts
>>> if res["responseMetadata"] is not None:
>>> ok, res = client.list_alerts(cursor=res["responseMetadata"]["next_cursor"])
'''
url = f"{self.url}/api/scanning/v1/alerts"
if limit:
url += '?limit=' + str(limit)
if cursor:
url += '&cursor=' + cursor
res = self.http.get(url, headers=self.hdrs, verify=self.ssl_verify)
if not self._checkResponse(res):
return [False, self.lasterr]
return [True, res.json()]
def delete_alert(self, policyid): # FIXME: policyid must be maintained for backwards compatibility reasons with older versions, but should be renamed to id or alert_id
'''
Delete the alert with the given id
Args:
policyid: Unique identifier associated with this alert.
Examples:
>>> client = ScanningAlertsClientV1(sdc_url=os.getenv("SDC_SECURE_URL", "https://secure.sysdig.com"),
>>> token=os.getenv("SDC_SECURE_TOKEN"))
>>> client.delete_alert(alert_id)
'''
res = self.http.delete(f"{self.url}/api/scanning/v1/alerts/{policyid}", headers=self.hdrs, verify=self.ssl_verify)
if not self._checkResponse(res):
return [False, self.lasterr]
return [True, res.text]
def add_alert_object(self, object):
'''
Adds alert object as raw JSON object.
Args:
object: JSON repsentation of the alert.
Examples:
>>> client = ScanningAlertsClientV1(sdc_url=os.getenv("SDC_SECURE_URL", "https://secure.sysdig.com"),
>>> token=os.getenv("SDC_SECURE_TOKEN"))
>>> alert = {
>>> "enabled": True,
>>> "type": "runtime",
>>> "name": "runtime-scanning-alert",
>>> "triggers": {
>>> "unscanned": True,
>>> "analysis_update": False,
>>> "vuln_update": False,
>>> "policy_eval": False,
>>> "failed": False
>>> },
>>> "autoscan": False,
>>> "onlyPassFail": False,
>>> "skipEventSend": False,
>>> "notificationChannelIds": []
>>> }
>>> client.add_alert_object(alert)
'''
url = self.url + '/api/scanning/v1/alerts'
data = json.dumps(object)
res = self.http.post(url, headers=self.hdrs, data=data, verify=self.ssl_verify)
if not self._checkResponse(res):
return [False, self.lasterr]
return [True, res.json()]