Uploaded image for project: 'Project Quay'
  1. Project Quay
  2. PROJQUAY-10317

[Core] Splunk Search Client and Field Mapping Infrastructure

XMLWordPrintable

    • Icon: Story Story
    • Resolution: Unresolved
    • Icon: Undefined Undefined
    • None
    • None
    • None
    • None
    • Security & Compliance
    • False
    • Hide

      None

      Show
      None
    • False
    • Not Selected

      [Core] Splunk Search Client and Field Mapping Infrastructure

      Summary

      Create the foundational infrastructure for reading audit logs from Splunk. This includes a dedicated Splunk search client class for executing queries and retrieving results, plus a comprehensive field mapping layer that transforms Splunk search results back into Quay's Log datatype. Together, these components provide the foundation for all read methods in SplunkLogsModel.

      Acceptance Criteria

      Splunk Search Client

      • [ ] New SplunkSearchClient class created with search job management capabilities
      • [ ] Supports both direct Splunk SDK connection and HEC-based deployments (via search API)
      • [ ] Can execute SPL queries and retrieve results with pagination
      • [ ] Handles search job lifecycle: create, poll for completion, retrieve results, cleanup
      • [ ] Configurable search timeout with sensible default (60 seconds)
      • [ ] Proper error handling for connection failures, timeouts, and authentication errors
      • [ ] SSL/TLS configuration matches existing producer patterns

      Field Mapping

      • [ ] SplunkLogMapper class created to convert Splunk results to Log objects
      • [ ] Datetime strings correctly parsed from Splunk's stored format
      • [ ] kind field name mapped to kind_id using existing log entry kinds
      • [ ] account field mapped to account_id via user lookup
      • [ ] performer field mapped to performer_id via user lookup
      • [ ] repository field mapped to repository_id via repository lookup
      • [ ] metadata_json field deserialized to Python dict
      • [ ] Missing/null fields handled gracefully with appropriate defaults
      • [ ] Batch user/repository lookups for performance (avoid N+1 queries)

      Technical Requirements

      Files to Create

      • data/logs_model/splunk_search_client.py - Search client class
      • data/logs_model/splunk_field_mapper.py - Field mapping utilities

      Files to Modify

      • data/logs_model/splunk_logs_model.py - Add search client and field mapper initialization
      • data/logs_model/datatypes.py - Add Log.for_splunk_log() class method

      SplunkSearchClient Class Design

      # data/logs_model/splunk_search_client.py
      
      from dataclasses import dataclass
      from typing import List, Dict, Any
      
      @dataclass
      class SplunkSearchResults:
          """Results from a Splunk search query."""
          results: List[Dict[str, Any]]  # Raw result rows
          total_count: int               # Total matching events
          offset: int                    # Current offset
          has_more: bool                 # Whether more results exist
      
      class SplunkSearchClient:
          """
          Client for executing searches against Splunk and retrieving results.
          Uses splunklib SDK for search API operations.
          """
      
          def __init__(
              self,
              host: str,
              port: int,
              bearer_token: str,
              url_scheme: str = "https",
              verify_ssl: bool = True,
              ssl_ca_path: str = None,
              index_prefix: str = None,
              search_timeout: int = 60,
              max_results: int = 10000,
          ):
              """Initialize Splunk search client with connection parameters."""
      
          def search(
              self,
              query: str,
              earliest_time: str = None,
              latest_time: str = None,
              max_count: int = None,
              offset: int = 0,
          ) -> SplunkSearchResults:
              """Execute a search query and return results."""
      
          def search_with_stats(
              self,
              query: str,
              earliest_time: str = None,
              latest_time: str = None,
          ) -> List[Dict]:
              """Execute a search with stats/aggregation commands."""
      
          def count(
              self,
              query: str,
              earliest_time: str = None,
              latest_time: str = None,
              timeout: int = 30,
          ) -> int:
              """Execute a search and return only the count of matching events."""
      

      SplunkLogMapper Class Design

      # data/logs_model/splunk_field_mapper.py
      
      class SplunkLogMapper:
          """
          Maps Splunk search result fields to Quay Log datatype.
      
          Splunk stores logs with these fields (from splunk_logs_model.py log_action):
          - kind: str (e.g., "push_repo", "pull_repo")
          - account: str (namespace username)
          - performer: str (performer username)
          - repository: str (repository name)
          - ip: str (IP address)
          - metadata_json: dict (serialized JSON object)
          - datetime: str (ISO format timestamp)
          """
      
          def __init__(self):
              self._kind_map: Optional[Dict[str, int]] = None
              self._user_cache: Dict[str, int] = {}
      
          def map_logs(self, splunk_results: List[Dict[str, Any]], namespace_name: str = None) -> List[Log]:
              """Convert a batch of Splunk results to Log objects."""
      
          def map_single_log(self, result: Dict[str, Any], id_user_map: Dict[int, Any] = None) -> Log:
              """Convert a single Splunk result to a Log object."""
      
          def _get_kind_id(self, kind_name: str) -> int:
              """Map kind name to kind_id using cached log entry kinds."""
      
          def _parse_datetime(self, datetime_str: str) -> datetime:
              """Parse Splunk datetime string to Python datetime."""
      
          def _parse_metadata(self, metadata_value: Any) -> Dict:
              """Parse metadata field to dict, handling string or dict input."""
      
          def _batch_lookup_users(self, usernames: List[str]) -> Dict[str, int]:
              """Batch lookup user IDs by username."""
      
          def _batch_lookup_repositories(self, repo_names: List[str], namespace_name: str) -> Dict[str, int]:
              """Batch lookup repository IDs by name within namespace."""
      

      Connection Pattern

      Follow the existing SplunkLogsProducer pattern in data/logs_model/logs_producer/splunk_logs_producer.py:

      # Reference pattern from splunk_logs_producer.py lines 22-54
      connect_args = {
          "host": host,
          "port": port,
          "token": bearer_token,
          "scheme": url_scheme,
          "verify": verify_ssl,
          "autologin": True,
      }
      # SSL context setup...
      context = ssl.create_default_context()
      connect_args["context"] = context
      
      # Establish connection
      self._service = client.connect(**connect_args)
      

      Search Job Pattern

      def _execute_search(self, spl_query: str, **kwargs) -> splunklib.results.JSONResultsReader:
          """Execute search job and wait for completion."""
          job = self._service.jobs.create(spl_query, **kwargs)
      
          while not job.is_done():
              time.sleep(0.5)
              job.refresh()
              if self._check_timeout():
                  job.cancel()
                  raise SplunkSearchTimeoutError("Search exceeded timeout")
      
          return splunklib.results.JSONResultsReader(job.results(output_mode="json"))
      

      Field Mapping Reference

      Current Splunk write format from splunk_logs_model.py:84-92:

      log_data = {
          "kind": kind_name,              # str: e.g., "push_repo"
          "account": username,            # str: namespace username or None
          "performer": performer_name,    # str: performer username or None
          "repository": repo_name,        # str: repository name or None
          "ip": ip,                       # str: IP address or None
          "metadata_json": metadata_json, # dict: serialized as JSON
          "datetime": timestamp,          # datetime: Python datetime object
      }
      

      Implementation Notes

      Existing Patterns to Follow

      1. SSL Configuration: Copy SSL context setup from splunk_logs_producer.py:41-54
      2. Error Handling: Follow pattern from splunk_logs_producer.py:64-77 for connection errors
      3. Kind ID Mapping: Use model.log.get_log_entry_kinds() from data/model/log.py
      4. User Lookup: Use model.user.get_user() or batch with model.user.get_user_map_by_ids()
      5. Repository Lookup: Use model.repository.get_repository(namespace, name)
      6. Datetime Parsing: Use dateutil.parser.parse for flexible datetime parsing

      Code Conventions

      • Use type hints for all method signatures
      • Document all public methods with docstrings
      • Use logger.exception() for error logging with stack traces
      • Raise custom exceptions (SplunkSearchError, SplunkSearchTimeoutError)
      • Cache kind_map on first use to avoid repeated database queries
      • Use batch lookups for users and repositories when processing multiple logs

      Edge Cases to Consider

      1. Connection Refused: Graceful handling when Splunk is unavailable
      2. Authentication Failure: Clear error message for invalid tokens
      3. Search Timeout: Cancel job and raise appropriate exception
      4. Empty Results: Return empty list, not None
      5. Large Result Sets: Enforce max_results limit to prevent memory issues
      6. Index Not Found: Handle missing index gracefully
      7. Unknown kind name: Log warning, use kind_id=0 or skip
      8. Non-existent user: Account/performer deleted after log was written
      9. Non-existent repository: Repository deleted after log was written
      10. Malformed datetime: Use current time or None with warning
      11. metadata_json as string: Some Splunk configs may store as escaped string

      Dependencies

      • None (this is the foundational story)

      External Dependencies

      • splunklib Python SDK (already in requirements.txt for write support)
      • dateutil for datetime parsing (already in requirements)

      Testing Requirements

      Unit Tests

      Create test files:
      - data/logs_model/test/test_splunk_search_client.py
      - data/logs_model/test/test_splunk_field_mapper.py

      class TestSplunkSearchClient:
          def test_init_creates_connection(self, mock_splunk_client): ...
          def test_search_returns_results(self, mock_splunk_client): ...
          def test_search_handles_timeout(self, mock_splunk_client): ...
          def test_search_handles_connection_error(self, mock_splunk_client): ...
          def test_search_handles_auth_error(self, mock_splunk_client): ...
          def test_search_with_pagination(self, mock_splunk_client): ...
          def test_count_returns_integer(self, mock_splunk_client): ...
          def test_ssl_context_configuration(self, mock_splunk_client): ...
      
      class TestSplunkLogMapper:
          def test_map_logs_returns_log_list(self, sample_splunk_results): ...
          def test_map_logs_maps_all_fields(self, sample_splunk_results): ...
          def test_map_logs_handles_missing_performer(self): ...
          def test_parse_datetime_iso_format(self): ...
          def test_parse_datetime_invalid_returns_default(self): ...
          def test_parse_metadata_dict_input(self): ...
          def test_parse_metadata_json_string(self): ...
          def test_get_kind_id_valid_kind(self, mock_log_kinds): ...
          def test_batch_user_lookup(self, mock_user): ...
          def test_handles_deleted_user(self): ...
      

      Definition of Done

      • [ ] Code implemented and follows project conventions
      • [ ] All acceptance criteria met
      • [ ] Tests written and passing
      • [ ] No regressions in existing functionality
      • [ ] SplunkSearchClient can be instantiated with test configuration
      • [ ] Search method returns results in expected format
      • [ ] Timeout handling verified with tests
      • [ ] Error handling covers connection, auth, and timeout scenarios
      • [ ] Field mapper handles all field types correctly
      • [ ] Batch lookups work for efficiency
      • [ ] Edge cases (missing users, repos) handled gracefully

              hgovinda Harish Govindarajulu
              hgovinda Harish Govindarajulu
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

                Created:
                Updated: