Skip to content

Commit

Permalink
[feature] alert created as per alerta.io api. Slack notifications can…
Browse files Browse the repository at this point in the history
… be configured through alerta.io if you wish.
  • Loading branch information
rajvarkala committed Jan 4, 2024
1 parent cd32ae9 commit 3e278dc
Show file tree
Hide file tree
Showing 7 changed files with 167 additions and 4 deletions.
7 changes: 7 additions & 0 deletions .env-example
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,10 @@ OTEL_PYTHON_LOG_CORRELATION=True
OTEL_EXPORTER_OTLP_INSECURE=True

NUM_SAMPLES_PER_CODE=50

# ALERTS CONFIGURATION
ALERTS_ENABLED=False
ALERTS_API_URL="http://host.docker.internal:8080/api/alert"
ALERTS_API_AUTH_HEADER_VALUE="Basic YWRtaW5AYWxlcnRhLmlvOnN1cGVyLXNlY3JldA=="
DEPLOYMENT_ENVIRONMENT="Development" # Should be one of Development or Production for Alerta.io to recognize
ALERTS_ORIGIN="valmi-activation-1"
5 changes: 5 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ services:
OTEL_PYTHON_LOG_CORRELATION: ${OTEL_PYTHON_LOG_CORRELATION}
OTEL_EXPORTER_OTLP_INSECURE: ${OTEL_EXPORTER_OTLP_INSECURE}
VALMI_INTERMEDIATE_STORE: ${VALMI_INTERMEDIATE_STORE}
ALERTS_ENABLED: ${ALERTS_ENABLED}
ALERTS_API_URL: ${ALERTS_API_URL}
ALERTS_API_AUTH_HEADER_VALUE: ${ALERTS_API_AUTH_HEADER_VALUE}
DEPLOYMENT_ENVIRONMENT: ${DEPLOYMENT_ENVIRONMENT}
ALERTS_ORIGIN: ${ALERTS_ORIGIN}


valmi-app-backend:
Expand Down
4 changes: 2 additions & 2 deletions git-tags.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
"source-postgres": "0.1.13"
}

valmi_core = "0.1.18"
valmi_core = "0.1.19"

valmi_app_backend = "0.1.18"
valmi_app_backend = "0.1.19"

valmi_app = "0.1.21"

Expand Down
3 changes: 3 additions & 0 deletions src/alerts/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .alert_generator import AlertGenerator # noqa: F401

__all__ = ["AlertGenerator"]
130 changes: 130 additions & 0 deletions src/alerts/alert_generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
'''
Copyright (c) 2024 valmi.io <https://github.com/valmi-io>
Created Date: Thursday, January 4th 2024, 6:04:15 pm
Author: Rajashekar Varkala @ valmi.io
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
'''

import logging
import threading
import time
from vyper import v
import requests
from requests.auth import HTTPBasicAuth

logger = logging.getLogger(v.get("LOGGER_NAME"))


class AlertGenerator:
__initialized = False

def __new__(cls) -> object:
if not hasattr(cls, "instance"):
cls.instance = super(AlertGenerator, cls).__new__(cls)
return cls.instance

def __init__(self) -> None:
if AlertGenerator.__initialized:
return

self.arr_acc_mutex = threading.RLock()
self.alerts = []
AlertGenerator.__initialized = True

self.alert_list_handler_thread = AlertListHandlerThread(128, "AlertListHandlerThread")
self.alert_list_handler_thread.start()

def sync_status_alert(self, sync_id: str, run_id: str, status: str, value: str) -> None:
if not v.get("ALERTS_ENABLED"):
return

def alert_fn():
sync = None
try:
sync = requests.get(
f"http://{v.get('APP_BACKEND')}:{v.get('APP_BACKEND_PORT')}/api/v1/workspaces/{'DUMMY'}/syncs/{sync_id}",
timeout=v.get("HTTP_REQ_TIMEOUT"),
auth=HTTPBasicAuth(v.get("ADMIN_EMAIL"), v.get("ADMIN_PASSWORD")),
).json()
except Exception:
logger.exception("Sync Details Fetch failed while alerting.")
return

try:
requests.post(
v.get("ALERTS_API_URL"),
timeout=v.get("HTTP_REQ_TIMEOUT"),
headers={
"Content-Type": "application/json",
"Authorization": v.get("ALERTS_API_AUTH_HEADER_VALUE")
},
json={
"correlate": [
"SyncRun" + status.capitalize()
],
"environment": v.get("DEPLOYMENT_ENVIRONMENT"),
"event": "SyncRun" + status.capitalize(),
"group": "SyncRuns",
"origin": v.get("ALERTS_ORIGIN"),
"resource": "Sync: " + sync['name'] + " Run: " + str(run_id),
"service": [
str(run_id)
],
"severity": "normal" if status == "terminated" else "major",
"text": "Run with id (" + str(run_id) + ") " + status + " for " + sync['name'] + " sync.",
"value": value
},
)
except Exception:
logger.exception("Failed to send sync run status alert.")
return

with self.arr_acc_mutex:
self.alerts.append(alert_fn)

def destroy(self) -> None:
self.cleaner_thread.exit_flag = True


class AlertListHandlerThread(threading.Thread):
def __init__(self, thread_id: int, name: str) -> None:
threading.Thread.__init__(self)
self.thread_id = thread_id
self.exit_flag = False
self.name = name

def run(self) -> None:
while not self.exit_flag:
try:
logger.info("Running alerts")

alerts = []
with AlertGenerator().arr_acc_mutex:
alerts = AlertGenerator().alerts.copy()
AlertGenerator().alerts.clear()

for alert in alerts:
alert()

time.sleep(15)
except Exception:
logger.exception("Error while sending alerts")
time.sleep(15)
20 changes: 19 additions & 1 deletion src/orchestrator/run_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from sqlalchemy.orm.attributes import flag_modified
from queue import Queue
from api.services import SyncsService, SyncRunsService
from alerts import AlertGenerator

logger = logging.getLogger(v.get("LOGGER_NAME"))
TICK_INTERVAL = 1
Expand Down Expand Up @@ -149,6 +150,15 @@ def _run(self):
self.sync_service.update_sync_and_run(sync, run)

elif sync.run_status == SyncStatus.ABORTING:
# Corner Case: if already succeeded, then just consider it finished
run = self.run_service.get(sync.last_run_id)
dagster_run_status = self.dc.get_run_status(run.dagster_run_id)
if dagster_run_status == DagsterRunStatus.SUCCESS:
sync.run_status = SyncStatus.RUNNING
run.status = SyncStatus.RUNNING
self.sync_service.update_sync_and_run(sync, run)
continue

logger.info(sync.run_status)
run = self.run_service.get(sync.last_run_id)
self.abort_active_run(sync, run)
Expand Down Expand Up @@ -222,6 +232,9 @@ def _run(self):
run.status = SyncStatus.FAILED
status = "failed"
error_msg = run.extra[key]["status"]["message"]

# Send an alert
AlertGenerator().sync_status_alert(sync.sync_id, run.run_id, status, error_msg)
break

if error_msg is None:
Expand Down Expand Up @@ -256,7 +269,12 @@ def _run(self):

status = "failed" if dagster_run_status == DagsterRunStatus.FAILURE else "terminated"

run.extra["run_manager"]["status"] = {"status": status, "message": "FILL THIS IN!"}
msg = "FILL THIS IN!"
run.extra["run_manager"]["status"] = {"status": status, "message": msg}

# Send an alert
AlertGenerator().sync_status_alert(sync.sync_id, run.run_id, status, msg)

flag_modified(run, "extra")
flag_modified(run, "status")

Expand Down
2 changes: 1 addition & 1 deletion valmi-app-backend

0 comments on commit 3e278dc

Please sign in to comment.