Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Telemetry to Understand Khoj Usage #215

Merged
merged 8 commits into from
May 17, 2023
45 changes: 45 additions & 0 deletions .github/workflows/dockerize_telemetry_server.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
name: dockerize telemetry server

on:
push:
branches:
- master
paths:
- src/telemetry/**
- .github/workflows/dockerize_telemetry_server.yml
pull_request:
branches:
- master
paths:
- src/telemetry/**
- .github/workflows/dockerize_telemetry_server.yml
workflow_dispatch:

env:
DOCKER_IMAGE_TAG: ${{ github.ref == 'refs/heads/master' && 'latest' || github.event.pull_request.number }}

jobs:
build:
name: Build Docker Image, Push to Container Registry
runs-on: ubuntu-latest
steps:
- name: Checkout Code
uses: actions/checkout@v3

- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2

- name: Login to GitHub Container Registry
uses: docker/login-action@v2
with:
registry: ghcr.io
username: ${{ github.repository_owner }}
password: ${{ secrets.PAT }}

- name: 📦 Build and Push Docker Image
uses: docker/build-push-action@v2
with:
context: src/telemetry
file: src/telemetry/Dockerfile
push: true
tags: ghcr.io/${{ github.repository }}-telemetry:${{ env.DOCKER_IMAGE_TAG }}
21 changes: 19 additions & 2 deletions src/khoj/configure.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
import json
from enum import Enum
import requests

# External Packages
import schedule
Expand Down Expand Up @@ -62,7 +63,7 @@ def configure_routes(app):
app.include_router(web_client)


@schedule.repeat(schedule.every(1).hour)
@schedule.repeat(schedule.every(61).minutes)
def update_search_index():
state.search_index_lock.acquire()
state.model = configure_search(state.model, state.config, regenerate=False)
Expand Down Expand Up @@ -189,7 +190,7 @@ def configure_conversation_processor(conversation_processor_config):
return conversation_processor


@schedule.repeat(schedule.every(15).minutes)
@schedule.repeat(schedule.every(17).minutes)
def save_chat_session():
# No need to create empty log file
if not (
Expand Down Expand Up @@ -223,3 +224,19 @@ def save_chat_session():

state.processor_config.conversation.chat_session = None
logger.info("📩 Saved current chat session to conversation logs")


@schedule.repeat(schedule.every(59).minutes)
def upload_telemetry():
if not state.config.app.should_log_telemetry or not state.telemetry:
message = "📡 No telemetry to upload" if not state.telemetry else "📡 Telemetry logging disabled"
logger.debug(message)
return

try:
logger.debug(f"📡 Upload usage telemetry to {constants.telemetry_server}:\n{state.telemetry}")
requests.post(constants.telemetry_server, json=state.telemetry)
except Exception as e:
logger.error(f"📡 Error uploading telemetry: {e}")
else:
state.telemetry = []
11 changes: 10 additions & 1 deletion src/khoj/routers/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from khoj.processor.conversation.gpt import converse, extract_questions
from khoj.processor.conversation.utils import message_to_log, message_to_prompt
from khoj.search_type import image_search, text_search
from khoj.utils.helpers import timer
from khoj.utils.helpers import log_telemetry, timer
from khoj.utils.rawconfig import FullConfig, SearchResponse
from khoj.utils.state import SearchType
from khoj.utils import state, constants
Expand Down Expand Up @@ -168,6 +168,11 @@ def search(
# Cache results
state.query_cache[query_cache_key] = results

# Only log telemetry if query is new and not a continuation of previous query
if state.previous_query is None or state.previous_query not in user_query:
state.telemetry += [log_telemetry(telemetry_type="api", api="search", app_config=state.config.app)]
state.previous_query = user_query

return results


Expand All @@ -191,6 +196,8 @@ def update(t: Optional[SearchType] = None, force: Optional[bool] = False):
else:
logger.info("📬 Processor reconfigured via API")

state.telemetry += [log_telemetry(telemetry_type="api", api="update", app_config=state.config.app)]

return {"status": "ok", "message": "khoj reloaded"}


Expand Down Expand Up @@ -251,4 +258,6 @@ def chat(q: Optional[str] = None):
conversation_log=meta_log.get("chat", []),
)

state.telemetry += [log_telemetry(telemetry_type="api", api="chat", app_config=state.config.app)]

return {"status": status, "response": gpt_response, "context": compiled_references}
2 changes: 2 additions & 0 deletions src/khoj/utils/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
app_root_directory = Path(__file__).parent.parent.parent
web_directory = app_root_directory / "khoj/interface/web/"
empty_escape_sequences = "\n|\r|\t| "
app_env_filepath = "~/.khoj/env"
telemetry_server = "https://khoj.beta.haletic.com/v1/telemetry"

# default app config to use
default_config = {
Expand Down
77 changes: 72 additions & 5 deletions src/khoj/utils/helpers.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,30 @@
# Standard Packages
from __future__ import annotations # to avoid quoting type hints
import logging
import sys
import torch
from collections import OrderedDict
import datetime
from importlib import import_module
from os.path import join
import logging
from os import path
from pathlib import Path
import platform
import requests
import sys
from time import perf_counter
import torch
from typing import Optional, Union, TYPE_CHECKING
import uuid

# Internal Packages
from khoj.utils import constants


if TYPE_CHECKING:
# External Packages
from sentence_transformers import CrossEncoder

# Internal Packages
from khoj.utils.models import BaseEncoder
from khoj.utils.rawconfig import AppConfig


def is_none_or_empty(item):
Expand Down Expand Up @@ -59,7 +68,7 @@ def load_model(model_name: str, model_type, model_dir=None, device: str = None)
"Load model from disk or huggingface"
# Construct model path
logger = logging.getLogger(__name__)
model_path = join(model_dir, model_name.replace("/", "_")) if model_dir is not None else None
model_path = path.join(model_dir, model_name.replace("/", "_")) if model_dir is not None else None

# Load model from model_path if it exists there
model_type_class = get_class_by_name(model_type) if isinstance(model_type, str) else model_type
Expand Down Expand Up @@ -123,3 +132,61 @@ def __setitem__(self, key, value):
if len(self) > self.capacity:
oldest = next(iter(self))
del self[oldest]


def get_server_id():
"""Get, Generate Persistent, Random ID per server install.
Helps count distinct khoj servers deployed.
Maintains anonymity by using non-PII random id."""
# Expand path to the khoj env file. It contains persistent internal app data
app_env_filename = path.expanduser(constants.app_env_filepath)

# Check if the file exists
if path.exists(app_env_filename):
# Read the contents of the file
with open(app_env_filename, "r") as f:
contents = f.readlines()

# Extract the server_id from the contents
for line in contents:
key, value = line.strip().split("=")
if key.strip() == "server_id":
server_id = value.strip()
break
else:
# If server_id is not found, generate a new one
server_id = str(uuid.uuid4())

else:
# Generate a new server id
server_id = str(uuid.uuid4())

# Write the server_id to the file
with open(app_env_filename, "w") as f:
f.write("server_id=" + server_id + "\n")

return server_id


def log_telemetry(telemetry_type: str, api: str = None, client: str = None, app_config: AppConfig = None):
"""Log basic app usage telemetry like client, os, api called"""
# Do not log usage telemetry, if telemetry is disabled via app config
if not app_config or not app_config.should_log_telemetry:
return []

# Populate telemetry data to log
request_body = {
"telemetry_type": telemetry_type,
"server_id": get_server_id(),
"os": platform.system(),
"timestamp": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
}
if api:
# API endpoint on server called by client
request_body["api"] = api
if client:
# Client from which the API was called. E.g Emacs, Obsidian
request_body["client"] = client

# Log telemetry data to telemetry endpoint
return request_body
5 changes: 5 additions & 0 deletions src/khoj/utils/rawconfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,15 @@ class ProcessorConfig(ConfigBase):
conversation: Optional[ConversationProcessorConfig]


class AppConfig(ConfigBase):
should_log_telemetry: bool


class FullConfig(ConfigBase):
content_type: Optional[ContentConfig]
search_type: Optional[SearchConfig]
processor: Optional[ProcessorConfig]
app: Optional[AppConfig] = AppConfig(should_log_telemetry=True)


class SearchResponse(ConfigBase):
Expand Down
4 changes: 3 additions & 1 deletion src/khoj/utils/state.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Standard Packages
import threading
from typing import List
from typing import List, Dict
from packaging import version

# External Packages
Expand All @@ -25,6 +25,8 @@
query_cache = LRU()
search_index_lock = threading.Lock()
SearchType = utils_config.SearchType
telemetry: List[Dict[str, str]] = []
previous_query: str = None

if torch.cuda.is_available():
# Use CUDA GPU
Expand Down
10 changes: 10 additions & 0 deletions src/telemetry/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Get Base Image
FROM tiangolo/uvicorn-gunicorn:python3.11-slim
LABEL org.opencontainers.image.source https://github.com/debanjum/khoj

# Install Telemetry Server Dependencies
COPY requirements.txt /tmp/requirements.txt
RUN pip install --no-cache-dir -r /tmp/requirements.txt

# Copy Application
COPY telemetry.py /app/main.py
2 changes: 2 additions & 0 deletions src/telemetry/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
uvicorn
fastapi
65 changes: 65 additions & 0 deletions src/telemetry/telemetry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# Standard Packages
import argparse
import logging
from typing import Dict, List

# External Packages
from fastapi import FastAPI
from fastapi import HTTPException
import sqlite3
import uvicorn


# Initialize Global App Variables
app = FastAPI()
sqlfile = "khoj.sqlite"
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)


@app.post("/v1/telemetry")
def v1_telemetry(telemetry_data: List[Dict[str, str]]):
# Throw exception if no telemetry data received in POST request body
if len(telemetry_data) == 0:
error_message = "Post body is empty. It should contain some telemetry data"
logger.error(error_message)
raise HTTPException(status_code=500, detail=error_message)

# Insert recieved telemetry data into SQLite db
logger.info(f"Insert row into telemetry table: {telemetry_data}")
with sqlite3.connect(sqlfile) as conn:
cur = conn.cursor()

# Create a table if it doesn't exist
cur.execute(
"""CREATE TABLE IF NOT EXISTS usage (id INTEGER PRIMARY KEY, time TIMESTAMP, type TEXT, server_id TEXT, os TEXT, api TEXT, client TEXT)"""
)

# Log telemetry data
for item in telemetry_data:
cur.execute(
"INSERT INTO usage (time, type, server_id, os, api, client) VALUES (?, ?, ?, ?, ?, ?)",
(
item["timestamp"],
item["telemetry_type"],
item["server_id"],
item["os"],
item.get("api"),
item.get("client"),
),
)
# Commit the changes
conn.commit()

return {"status": "ok", "message": "Logged usage telemetry"}


if __name__ == "__main__":
# Setup Argument Parser
parser = argparse.ArgumentParser(description="Start Khoj Telemetry Server")
parser.add_argument("--host", default="127.0.0.1", type=str, help="I.P of telemetry server")
parser.add_argument("--port", "-p", default=80, type=int, help="Port of telemetry server")
args = parser.parse_args()

# Start Application Server
uvicorn.run(app, host=args.host, port=args.port, log_level="debug")
Loading