-
Story
-
Resolution: Unresolved
-
Undefined
-
None
-
None
-
None
-
None
[Core] Splunk Search Client and Field Mapping Infrastructure
Summary
Create the foundational infrastructure for reading audit logs from Splunk. This includes a dedicated Splunk search client class for executing queries and retrieving results, plus a comprehensive field mapping layer that transforms Splunk search results back into Quay's Log datatype. Together, these components provide the foundation for all read methods in SplunkLogsModel.
Acceptance Criteria
Splunk Search Client
- [ ] New SplunkSearchClient class created with search job management capabilities
- [ ] Supports both direct Splunk SDK connection and HEC-based deployments (via search API)
- [ ] Can execute SPL queries and retrieve results with pagination
- [ ] Handles search job lifecycle: create, poll for completion, retrieve results, cleanup
- [ ] Configurable search timeout with sensible default (60 seconds)
- [ ] Proper error handling for connection failures, timeouts, and authentication errors
- [ ] SSL/TLS configuration matches existing producer patterns
Field Mapping
- [ ] SplunkLogMapper class created to convert Splunk results to Log objects
- [ ] Datetime strings correctly parsed from Splunk's stored format
- [ ] kind field name mapped to kind_id using existing log entry kinds
- [ ] account field mapped to account_id via user lookup
- [ ] performer field mapped to performer_id via user lookup
- [ ] repository field mapped to repository_id via repository lookup
- [ ] metadata_json field deserialized to Python dict
- [ ] Missing/null fields handled gracefully with appropriate defaults
- [ ] Batch user/repository lookups for performance (avoid N+1 queries)
Technical Requirements
Files to Create
- data/logs_model/splunk_search_client.py - Search client class
- data/logs_model/splunk_field_mapper.py - Field mapping utilities
Files to Modify
- data/logs_model/splunk_logs_model.py - Add search client and field mapper initialization
- data/logs_model/datatypes.py - Add Log.for_splunk_log() class method
SplunkSearchClient Class Design
# data/logs_model/splunk_search_client.py from dataclasses import dataclass from typing import List, Dict, Any @dataclass class SplunkSearchResults: """Results from a Splunk search query.""" results: List[Dict[str, Any]] # Raw result rows total_count: int # Total matching events offset: int # Current offset has_more: bool # Whether more results exist class SplunkSearchClient: """ Client for executing searches against Splunk and retrieving results. Uses splunklib SDK for search API operations. """ def __init__( self, host: str, port: int, bearer_token: str, url_scheme: str = "https", verify_ssl: bool = True, ssl_ca_path: str = None, index_prefix: str = None, search_timeout: int = 60, max_results: int = 10000, ): """Initialize Splunk search client with connection parameters.""" def search( self, query: str, earliest_time: str = None, latest_time: str = None, max_count: int = None, offset: int = 0, ) -> SplunkSearchResults: """Execute a search query and return results.""" def search_with_stats( self, query: str, earliest_time: str = None, latest_time: str = None, ) -> List[Dict]: """Execute a search with stats/aggregation commands.""" def count( self, query: str, earliest_time: str = None, latest_time: str = None, timeout: int = 30, ) -> int: """Execute a search and return only the count of matching events."""
SplunkLogMapper Class Design
# data/logs_model/splunk_field_mapper.py class SplunkLogMapper: """ Maps Splunk search result fields to Quay Log datatype. Splunk stores logs with these fields (from splunk_logs_model.py log_action): - kind: str (e.g., "push_repo", "pull_repo") - account: str (namespace username) - performer: str (performer username) - repository: str (repository name) - ip: str (IP address) - metadata_json: dict (serialized JSON object) - datetime: str (ISO format timestamp) """ def __init__(self): self._kind_map: Optional[Dict[str, int]] = None self._user_cache: Dict[str, int] = {} def map_logs(self, splunk_results: List[Dict[str, Any]], namespace_name: str = None) -> List[Log]: """Convert a batch of Splunk results to Log objects.""" def map_single_log(self, result: Dict[str, Any], id_user_map: Dict[int, Any] = None) -> Log: """Convert a single Splunk result to a Log object.""" def _get_kind_id(self, kind_name: str) -> int: """Map kind name to kind_id using cached log entry kinds.""" def _parse_datetime(self, datetime_str: str) -> datetime: """Parse Splunk datetime string to Python datetime.""" def _parse_metadata(self, metadata_value: Any) -> Dict: """Parse metadata field to dict, handling string or dict input.""" def _batch_lookup_users(self, usernames: List[str]) -> Dict[str, int]: """Batch lookup user IDs by username.""" def _batch_lookup_repositories(self, repo_names: List[str], namespace_name: str) -> Dict[str, int]: """Batch lookup repository IDs by name within namespace."""
Connection Pattern
Follow the existing SplunkLogsProducer pattern in data/logs_model/logs_producer/splunk_logs_producer.py:
# Reference pattern from splunk_logs_producer.py lines 22-54
connect_args = {
"host": host,
"port": port,
"token": bearer_token,
"scheme": url_scheme,
"verify": verify_ssl,
"autologin": True,
}
# SSL context setup...
context = ssl.create_default_context()
connect_args["context"] = context
# Establish connection
self._service = client.connect(**connect_args)
Search Job Pattern
def _execute_search(self, spl_query: str, **kwargs) -> splunklib.results.JSONResultsReader:
"""Execute search job and wait for completion."""
job = self._service.jobs.create(spl_query, **kwargs)
while not job.is_done():
time.sleep(0.5)
job.refresh()
if self._check_timeout():
job.cancel()
raise SplunkSearchTimeoutError("Search exceeded timeout")
return splunklib.results.JSONResultsReader(job.results(output_mode="json"))
Field Mapping Reference
Current Splunk write format from splunk_logs_model.py:84-92:
log_data = {
"kind": kind_name, # str: e.g., "push_repo"
"account": username, # str: namespace username or None
"performer": performer_name, # str: performer username or None
"repository": repo_name, # str: repository name or None
"ip": ip, # str: IP address or None
"metadata_json": metadata_json, # dict: serialized as JSON
"datetime": timestamp, # datetime: Python datetime object
}
Implementation Notes
Existing Patterns to Follow
- SSL Configuration: Copy SSL context setup from splunk_logs_producer.py:41-54
- Error Handling: Follow pattern from splunk_logs_producer.py:64-77 for connection errors
- Kind ID Mapping: Use model.log.get_log_entry_kinds() from data/model/log.py
- User Lookup: Use model.user.get_user() or batch with model.user.get_user_map_by_ids()
- Repository Lookup: Use model.repository.get_repository(namespace, name)
- Datetime Parsing: Use dateutil.parser.parse for flexible datetime parsing
Code Conventions
- Use type hints for all method signatures
- Document all public methods with docstrings
- Use logger.exception() for error logging with stack traces
- Raise custom exceptions (SplunkSearchError, SplunkSearchTimeoutError)
- Cache kind_map on first use to avoid repeated database queries
- Use batch lookups for users and repositories when processing multiple logs
Edge Cases to Consider
- Connection Refused: Graceful handling when Splunk is unavailable
- Authentication Failure: Clear error message for invalid tokens
- Search Timeout: Cancel job and raise appropriate exception
- Empty Results: Return empty list, not None
- Large Result Sets: Enforce max_results limit to prevent memory issues
- Index Not Found: Handle missing index gracefully
- Unknown kind name: Log warning, use kind_id=0 or skip
- Non-existent user: Account/performer deleted after log was written
- Non-existent repository: Repository deleted after log was written
- Malformed datetime: Use current time or None with warning
- metadata_json as string: Some Splunk configs may store as escaped string
Dependencies
- None (this is the foundational story)
External Dependencies
- splunklib Python SDK (already in requirements.txt for write support)
- dateutil for datetime parsing (already in requirements)
Testing Requirements
Unit Tests
Create test files:
- data/logs_model/test/test_splunk_search_client.py
- data/logs_model/test/test_splunk_field_mapper.py
class TestSplunkSearchClient: def test_init_creates_connection(self, mock_splunk_client): ... def test_search_returns_results(self, mock_splunk_client): ... def test_search_handles_timeout(self, mock_splunk_client): ... def test_search_handles_connection_error(self, mock_splunk_client): ... def test_search_handles_auth_error(self, mock_splunk_client): ... def test_search_with_pagination(self, mock_splunk_client): ... def test_count_returns_integer(self, mock_splunk_client): ... def test_ssl_context_configuration(self, mock_splunk_client): ... class TestSplunkLogMapper: def test_map_logs_returns_log_list(self, sample_splunk_results): ... def test_map_logs_maps_all_fields(self, sample_splunk_results): ... def test_map_logs_handles_missing_performer(self): ... def test_parse_datetime_iso_format(self): ... def test_parse_datetime_invalid_returns_default(self): ... def test_parse_metadata_dict_input(self): ... def test_parse_metadata_json_string(self): ... def test_get_kind_id_valid_kind(self, mock_log_kinds): ... def test_batch_user_lookup(self, mock_user): ... def test_handles_deleted_user(self): ...
Definition of Done
- [ ] Code implemented and follows project conventions
- [ ] All acceptance criteria met
- [ ] Tests written and passing
- [ ] No regressions in existing functionality
- [ ] SplunkSearchClient can be instantiated with test configuration
- [ ] Search method returns results in expected format
- [ ] Timeout handling verified with tests
- [ ] Error handling covers connection, auth, and timeout scenarios
- [ ] Field mapper handles all field types correctly
- [ ] Batch lookups work for efficiency
- [ ] Edge cases (missing users, repos) handled gracefully