Source code for sdcclient._monitor

import json
from typing import Any, Tuple, Union

from sdcclient._common import _SdcCommon
from sdcclient.monitor import EventsClientV2, DashboardsClientV3

[docs]class SdMonitorClient(DashboardsClientV3, EventsClientV2, _SdcCommon): def __init__(self, token="", sdc_url='', ssl_verify=True, custom_headers=None): super(SdMonitorClient, self).__init__(token, sdc_url, ssl_verify, custom_headers) self.product = "SDC"
[docs] def get_alerts(self) -> Union[Tuple[bool, str], Tuple[bool, Any]]: ''' Retrieve the list of alerts configured by the user. Returns: A tuple where the first parameter indicates if the call was successful, and the second parameter holds either the error as string, or the response object. Examples: >>> ok, res = client.get_alerts() >>> for alert in res['alerts']: >>> print(f'enabled: {str(alert["enabled"])}, name: {alert["name"]}' ) ''' res = self.http.get(self.url + '/api/alerts', headers=self.hdrs, verify=self.ssl_verify) return self._request_result(res)
[docs] def get_notifications(self, from_ts, to_ts, state=None, resolved=None) -> Union[Tuple[bool, str], Tuple[bool, Any]]: ''' Returns the list of Sysdig Monitor alert notifications. Args: from_ts (int): filter events by start time. Timestamp format is in UTC (seconds). to_ts (int): filter events by start time. Timestamp format is in UTC (seconds). state (str): filter events by alert state. Supported values are ``OK`` and ``ACTIVE``. resolved (str): filter events by resolution status. Supported values are "True" and "False". Returns: A tuple where the first parameter indicates if the call was successful, and the second parameter holds either the error as string, or the response object. Examples: >>> # Get the notifications in the last day >>> ok, res = client.get_notifications(from_ts=int(time.time() - 86400), to_ts=int(time.time())) >>> # Get the notifications in the last day and active state >>> ok, res = client.get_notifications(from_ts=int(time.time() - 86400), to_ts=int(time.time()), state='ACTIVE') >>> # Get the notifications in the last day and active state >>> ok, res = client.get_notifications(from_ts=int(time.time() - 86400), to_ts=int(time.time()), state='OK') >>> # Get the notifications in the last day and resolved state >>> ok, res = client.get_notifications(from_ts=int(time.time() - 86400), to_ts=int(time.time()), resolved=True) ''' params = {} if from_ts is not None: params['from'] = from_ts * 1000000 if to_ts is not None: params['to'] = to_ts * 1000000 if state is not None: params['state'] = state if resolved is not None: params['resolved'] = resolved res = self.http.get(self.url + '/api/notifications', headers=self.hdrs, params=params, verify=self.ssl_verify) if not self._checkResponse(res): return False, self.lasterr return True, res.json()
[docs] def update_notification_resolution(self, notification, resolved) -> Union[Tuple[bool, str], Tuple[bool, Any]]: ''' Updates the resolution status of an alert notification. Args: notification (object): notification object as returned by :func:`~SdcClient.get_notifications`. resolved (str): new resolution status. Supported values are ``True`` and ``False``. Returns: A tuple where the first parameter indicates if the call was successful, and the second parameter holds either the error as string, or the response object. Examples: >>> # Get the unresolved notifications in the last day >>> ok, res = sdclient.get_notifications(from_ts=int(time.time() - int(num_days_to_resolve) * 86400), to_ts=int(time.time()), resolved=False) >>> # Resolve all of them >>> for notification in notifications: >>> ok, res = sdclient.update_notification_resolution(notification, True) ''' if 'id' not in notification: return False, 'Invalid notification format' notification['resolved'] = resolved data = {'notification': notification} res = self.http.put(self.url + '/api/notifications/' + str(notification['id']), headers=self.hdrs, data=json.dumps(data), verify=self.ssl_verify) return self._request_result(res)
[docs] def create_alert(self, name=None, description=None, severity=None, for_atleast_s=None, condition=None, segmentby=None, segment_condition='ANY', user_filter='', notify=None, enabled=True, annotations=None, alert_obj=None, type="MANUAL") -> Union[Tuple[bool, str], Tuple[bool, Any]]: ''' Create a threshold-based alert. Args: name (str): the alert name. This will appear in the Sysdig Monitor UI and in notification emails description (str): the alert description. This will appear in the Sysdig Monitor UI and in notification emails severity (int): syslog-encoded alert severity. This is a number from 0 to 7 where 0 means 'emergency' and 7 is 'debug' for_atleast_s (int): the number of consecutive seconds the condition must be satisfied for the alert to fire condition (int): the alert condition, as described here!/Alerts/post_api_alerts segmentby (List(str)): a list of Sysdig Monitor segmentation criteria that can be used to apply the alert to multiple entities. For example, segmenting a CPU alert by ``['host.mac', '']`` allows to apply it to any process in any machine. segment_condition (str): When :param:`segmentby` is specified (and therefore the alert will cover multiple entities) this field is used to determine when it will fire. In particular, you have two options for *segment_condition*: **ANY** (the alert will fire when at least one of the monitored entities satisfies the condition) and **ALL** (the alert will fire when all of the monitored entities satisfy the condition). user_filter (str): a boolean expression combining Sysdig Monitor segmentation criteria that makes it possible to reduce the scope of the alert. For example: ``'production' and container.image='nginx'``. notify (str): the type of notification you want this alert to generate. Options are ``EMAIL``, ``SNS``, ``PAGER_DUTY``, ``SYSDIG_DUMP`` enabled (bool): if True, the alert will be enabled when created. annotations (dict): an optional dictionary of custom properties that you can associate to this alert for automation or management reasons. alert_obj (object): an optional fully-formed Alert object of the format returned in an "alerts" list by :func:`~SdcClient.get_alerts` This is an alternative to creating the Alert using the individual parameters listed above. type (str): the type of the alert, ``MANUAL`` if the alert uses a normal query, ``PROMETHEUS`` if it's PromQL Returns: A tuple where the first parameter indicates if the call was successful, and the second parameter holds either the error as string, or the response object. ''' if annotations is None: annotations = {} if segmentby is None: segmentby = [] # # Get the list of alerts from the server # res = self.http.get(self.url + '/api/alerts', headers=self.hdrs, verify=self.ssl_verify) if not self._checkResponse(res): return False, self.lasterr res.json() if alert_obj is None: if None in (name, description, severity, for_atleast_s, condition): return False, 'Must specify a full Alert object or all parameters: ' \ 'name, description, severity, for_atleast_s, condition' else: # # Populate the alert information # alert_json = { 'alert': { 'type': type, 'name': name, 'description': description, 'enabled': enabled, 'severity': severity, 'timespan': for_atleast_s * 1000000, 'condition': condition, 'filter': user_filter } } if segmentby: alert_json['alert']['segmentBy'] = segmentby alert_json['alert']['segmentCondition'] = {'type': segment_condition} if annotations: alert_json['alert']['annotations'] = annotations if notify is not None: alert_json['alert']['notificationChannelIds'] = notify else: # The REST API enforces "Alert ID and version must be null", so remove them if present, # since these would have been there in a dump from the example. alert_obj.pop('id', None) alert_obj.pop('version', None) alert_json = { 'alert': alert_obj } # # Create the new alert # res = + '/api/alerts', headers=self.hdrs, data=json.dumps(alert_json), verify=self.ssl_verify) return self._request_result(res)
[docs] def update_alert(self, alert) -> Union[Tuple[bool, str], Tuple[bool, Any]]: ''' Update a modified threshold-based alert. Args: alert (object): one modified alert object of the same format as those in the list returned by :func:`~SdcClient.get_alerts`. Returns: A tuple where the first parameter indicates if the call was successful, and the second parameter holds either the error as string, or updated alert. Examples: >>> ok, res = client.get_alerts() >>> if not ok: >>> sys.exit(1) >>> for alert in res['alerts']: >>> if alert['name'] == alert_name: >>> alert['timespan'] = alert['timespan'] * 2 # Note: Expressed in seconds * 1000000 >>> ok, res_update = client.update_alert(alert) ''' if 'id' not in alert: return False, "Invalid alert format" res = self.http.put(self.url + '/api/alerts/' + str(alert['id']), headers=self.hdrs, data=json.dumps({"alert": alert}), verify=self.ssl_verify) return self._request_result(res)
[docs] def delete_alert(self, alert) -> Union[Tuple[bool, str], Tuple[bool, Any]]: '''**Description** Deletes an alert. **Arguments** - **alert**: the alert dictionary as returned by :func:`~SdcClient.get_alerts`. **Success Return Value** ``None``. **Example** `examples/ <>`_ ''' if 'id' not in alert: return False, 'Invalid alert format' res = self.http.delete(self.url + '/api/alerts/' + str(alert['id']), headers=self.hdrs, verify=self.ssl_verify) if not self._checkResponse(res): return False, self.lasterr return True, None
[docs] def get_explore_grouping_hierarchy(self) -> Union[Tuple[bool, str], Tuple[bool, Any]]: '''**Description** Return the user's current grouping hierarchy as visible in the Explore tab of Sysdig Monitor. **Success Return Value** A list containing the list of the user's Explore grouping criteria. **Example** `examples/ <>`_ ''' res = self.http.get(self.url + '/api/groupConfigurations', headers=self.hdrs, verify=self.ssl_verify) if not self._checkResponse(res): return False, self.lasterr data = res.json() if 'groupConfigurations' not in data: return False, 'corrupted groupConfigurations API response' gconfs = data['groupConfigurations'] for gconf in gconfs: if gconf['id'] == 'explore': res = [] items = gconf['groups'][0]['groupBy'] for item in items: res.append(item['metric']) return True, res return False, 'corrupted groupConfigurations API response, missing "explore" entry'
[docs] def set_explore_grouping_hierarchy(self, new_hierarchy) -> Union[Tuple[bool, str], Tuple[bool, Any]]: '''**Description** Changes the grouping hierarchy in the Explore panel of the current user. **Arguments** - **new_hierarchy**: a list of sysdig segmentation metrics indicating the new grouping hierarchy. ''' body = { 'id': 'explore', 'groups': [{'groupBy': []}] } for item in new_hierarchy: body['groups'][0]['groupBy'].append({'metric': item}) res = self.http.put(self.url + '/api/groupConfigurations/explore', headers=self.hdrs, data=json.dumps(body), verify=self.ssl_verify) if not self._checkResponse(res): return False, self.lasterr else: return True, None
[docs] def get_metrics(self) -> Union[Tuple[bool, str], Tuple[bool, Any]]: '''**Description** Return the metric list that can be used for data requests/alerts/dashboards. **Success Return Value** A dictionary containing the list of available metrics. **Example** `examples/ <>`_ ''' res = self.http.get(self.url + '/api/data/metrics', headers=self.hdrs, verify=self.ssl_verify) return self._request_result(res)
[docs] @staticmethod def convert_scope_string_to_expression(scope) -> Union[Tuple[bool, str], Tuple[bool, Any]]: '''**Description** Internal function to convert a filter string to a filter object to be used with dashboards. ''' # # NOTE: The supported grammar is not perfectly aligned with the grammar supported by the Sysdig backend. # Proper grammar implementation will happen soon. # For practical purposes, the parsing will have equivalent results. # if scope is None or not scope: return True, [] expressions = [] string_expressions = scope.strip(' \t\n\r').split(' and ') expression_re = re.compile( '^(?P<not>not )?(?P<operand>[^ ]+) (?P<operator>=|!=|in|contains|starts with) (?P<value>(:?"[^"]+"|\'[^\']+\'|(.+)|.+))$') for string_expression in string_expressions: matches = expression_re.match(string_expression) if matches is None: return False, 'invalid scope format' is_not_operator ='not') is not None if'operator') == 'in': list_value ='value').strip(' ()') value_matches = re.findall('(:?\'[^\',]+\')|(:?"[^",]+")|(:?[,]+)', list_value) if len(value_matches) == 0: return False, 'invalid scope value list format' value_matches = map(lambda v: v[0] if v[0] else v[1], value_matches) values = map(lambda v: v.strip(' "\''), value_matches) else: values = ['value').strip('"\'')] operator_parse_dict = { 'in': 'in' if not is_not_operator else 'notIn', '=': 'equals' if not is_not_operator else 'notEquals', '!=': 'notEquals' if not is_not_operator else 'equals', 'contains': 'contains' if not is_not_operator else 'notContains', 'starts with': 'startsWith' } operator = operator_parse_dict.get('operator'), None) if operator is None: return False, 'invalid scope operator' expressions.append({ 'operand':'operand'), 'operator': operator, 'value': values }) return True, expressions
# For backwards compatibility SdcClient = SdMonitorClient