-
Notifications
You must be signed in to change notification settings - Fork 243
/
strategic_analysis_parser.py
219 lines (184 loc) · 7.92 KB
/
strategic_analysis_parser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
from typing import List, Dict, Optional, Union
import re
import logging
from dataclasses import dataclass
from datetime import datetime
@dataclass
class ResearchFocus:
"""Represents a specific area of research focus"""
area: str
priority: int
source_query: str = ""
timestamp: str = ""
search_queries: List[str] = None
def __post_init__(self):
if not self.timestamp:
self.timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
if self.search_queries is None:
self.search_queries = []
@dataclass
class AnalysisResult:
"""Contains the complete analysis result"""
original_question: str
focus_areas: List[ResearchFocus]
raw_response: str
timestamp: str = ""
confidence_score: float = 0.0
def __post_init__(self):
if not self.timestamp:
self.timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Set up logging
logger = logging.getLogger(__name__)
class StrategicAnalysisParser:
"""Enhanced parser with improved pattern matching and validation"""
def __init__(self):
self.patterns = {
'original_question': [
r"(?i)original question analysis:\s*(.*?)(?=research gap|$)",
r"(?i)original query:\s*(.*?)(?=research gap|$)",
r"(?i)research question:\s*(.*?)(?=research gap|$)",
r"(?i)topic analysis:\s*(.*?)(?=research gap|$)"
],
'research_gaps': [
r"(?i)research gaps?:\s*",
r"(?i)gaps identified:\s*",
r"(?i)areas for research:\s*",
r"(?i)investigation areas:\s*"
],
'priority': [
r"(?i)priority:\s*(\d+)",
r"(?i)priority level:\s*(\d+)",
r"(?i)\(priority:\s*(\d+)\)",
r"(?i)importance:\s*(\d+)"
]
}
self.logger = logging.getLogger(__name__)
def parse_analysis(self, llm_response: str) -> Optional[AnalysisResult]:
"""Main parsing method with improved validation"""
try:
# Clean and normalize the response
cleaned_response = self._clean_text(llm_response)
# Extract original question with validation
original_question = self._extract_original_question(cleaned_response)
if not original_question:
self.logger.warning("Failed to extract original question")
original_question = "Original question extraction failed"
# Extract and validate research areas
focus_areas = self._extract_research_areas(cleaned_response)
focus_areas = self._normalize_focus_areas(focus_areas)
# Calculate confidence score
confidence_score = self._calculate_confidence_score(original_question, focus_areas)
return AnalysisResult(
original_question=original_question,
focus_areas=focus_areas,
raw_response=llm_response,
confidence_score=confidence_score
)
except Exception as e:
self.logger.error(f"Error in parse_analysis: {str(e)}")
return None
def _clean_text(self, text: str) -> str:
"""Clean and normalize text for parsing"""
text = re.sub(r'\n{3,}', '\n\n', text)
text = re.sub(r'\s{2,}', ' ', text)
text = re.sub(r'(\d+\))', r'\1.', text)
return text.strip()
def _extract_original_question(self, text: str) -> str:
"""Extract original question with improved matching"""
for pattern in self.patterns['original_question']:
match = re.search(pattern, text, re.DOTALL)
if match:
return self._clean_text(match.group(1))
return ""
def _extract_research_areas(self, text: str) -> List[ResearchFocus]:
"""Extract research areas with enhanced validation"""
areas = []
for pattern in self.patterns['research_gaps']:
gap_match = re.search(pattern, text)
if gap_match:
sections = re.split(r'\n\s*\d+[\.)]\s+', text[gap_match.end():])
sections = [s for s in sections if s.strip()]
for section in sections:
focus = self._parse_research_focus(section)
if focus and self._is_valid_focus(focus):
areas.append(focus)
break
return areas
def _parse_research_focus(self, text: str) -> Optional[ResearchFocus]:
"""Parse research focus with improved validation without reasoning."""
try:
# Extract area
area = text.split('\n')[0].strip()
# Extract and validate priority
priority = self._extract_priority(text)
# Return ResearchFocus without reasoning
return ResearchFocus(
area=area,
priority=priority
)
except Exception as e:
self.logger.error(f"Error parsing research focus: {str(e)}")
return None
def _extract_priority(self, text: str) -> int:
"""Extract priority with validation"""
for pattern in self.patterns['priority']:
priority_match = re.search(pattern, text)
if priority_match:
try:
priority = int(priority_match.group(1))
return max(1, min(5, priority))
except ValueError:
continue
return 3 # Default priority
def _is_valid_focus(self, focus: ResearchFocus) -> bool:
"""Validate research focus completeness and quality"""
if not focus.area: # Only check if area exists and isn't empty
return False
if focus.priority < 1 or focus.priority > 5:
return False
return True
def _normalize_focus_areas(self, areas: List[ResearchFocus]) -> List[ResearchFocus]:
"""Normalize and validate focus areas"""
normalized = []
for area in areas:
if not area.area.strip():
continue
area.priority = max(1, min(5, area.priority))
if self._is_valid_focus(area):
normalized.append(area)
# Sort by priority (highest first) but don't add any filler areas
normalized.sort(key=lambda x: x.priority, reverse=True)
return normalized
def _calculate_confidence_score(self, question: str, areas: List[ResearchFocus]) -> float:
"""Calculate confidence score for analysis quality"""
score = 0.0
# Question quality (0.3)
if question and len(question.split()) >= 3:
score += 0.3
# Areas quality (0.7)
if areas:
# Valid areas ratio (0.35) - now based on proportion that are valid vs total
num_areas = len(areas)
if num_areas > 0: # Avoid division by zero
valid_areas = sum(1 for a in areas if self._is_valid_focus(a))
score += 0.35 * (valid_areas / num_areas)
# Priority distribution (0.35) - now based on having different priorities
if num_areas > 0: # Avoid division by zero
unique_priorities = len(set(a.priority for a in areas))
score += 0.35 * (unique_priorities / num_areas)
return round(score, 2)
def format_analysis_result(self, result: AnalysisResult) -> str:
"""Format analysis result for display without reasoning."""
formatted = [
"Strategic Analysis Result",
"=" * 80,
f"\nOriginal Question Analysis:\n{result.original_question}\n",
f"Analysis Confidence Score: {result.confidence_score}",
"\nResearch Focus Areas:"
]
for i, focus in enumerate(result.focus_areas, 1):
formatted.extend([
f"\n{i}. {focus.area}",
f" Priority: {focus.priority}"
])
return "\n".join(formatted)