1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189
   | import weakref from collections import OrderedDict from threading import RLock import time
  class LRUCache:     """带过期时间的LRU缓存"""          def __init__(self, max_size=1000, ttl=3600):         self.max_size = max_size         self.ttl = ttl         self.cache = OrderedDict()         self.timestamps = {}         self.lock = RLock()              def get(self, key):         """获取缓存值"""         with self.lock:             if key not in self.cache:                 return None                                           if self._is_expired(key):                 self._remove(key)                 return None                                           self.cache.move_to_end(key)             return self.cache[key]          def put(self, key, value):         """设置缓存值"""         with self.lock:             if key in self.cache:                                  self.cache[key] = value                 self.cache.move_to_end(key)             else:                                  self.cache[key] = value                              self.timestamps[key] = time.time()                                       while len(self.cache) > self.max_size:                 oldest_key = next(iter(self.cache))                 self._remove(oldest_key)          def _is_expired(self, key):         """检查键是否过期"""         if key not in self.timestamps:             return True         return time.time() - self.timestamps[key] > self.ttl          def _remove(self, key):         """移除键值对"""         self.cache.pop(key, None)         self.timestamps.pop(key, None)          def clear_expired(self):         """清理过期项"""         with self.lock:             expired_keys = [                 key for key in self.cache.keys()                  if self._is_expired(key)             ]             for key in expired_keys:                 self._remove(key)          def get_stats(self):         """获取缓存统计信息"""         with self.lock:             return {                 'size': len(self.cache),                 'max_size': self.max_size,                 'hit_rate': getattr(self, '_hit_rate', 0)             }
  class ImprovedDataProcessor:     """改进后的数据处理器"""          def __init__(self, cache_size=1000, cache_ttl=3600):                  self.cache = LRUCache(max_size=cache_size, ttl=cache_ttl)                           self.callbacks = weakref.WeakSet()                           self.data_buffer = []         self.max_buffer_size = 100                           self._setup_cleanup_timer()              def process_data(self, data_id, data):         """改进后的数据处理方法"""         try:                          cached_result = self.cache.get(data_id)             if cached_result is not None:                 return cached_result                                           with self._resource_manager(data) as resources:                 result = self._safe_process_data(data_id, data, resources)                                                   self.cache.put(data_id, result)                                  return result                          except Exception as e:             logging.error(f"数据处理失败: {e}", exc_info=True)                          self._cleanup_resources()             raise         finally:                          self._cleanup_buffer()          def _safe_process_data(self, data_id, data, resources):         """安全的数据处理"""                  intermediate_data = resources.get_intermediate_object()                  try:                          result = self._expensive_computation(data)                                       callback = CallbackHandler(data_id, weakref.ref(self))             self.callbacks.add(callback)                          return result                      finally:                          resources.release_intermediate_object(intermediate_data)          def _cleanup_buffer(self):         """清理数据缓冲区"""         if len(self.data_buffer) > self.max_buffer_size:                          self.data_buffer = self.data_buffer[-self.max_buffer_size//2:]                                       import gc             gc.collect()          def _setup_cleanup_timer(self):         """设置定期清理定时器"""         import threading                  def cleanup_task():             while True:                 try:                                          self.cache.clear_expired()                                                               self._cleanup_buffer()                                                               self._log_memory_usage()                                      except Exception as e:                     logging.error(f"清理任务异常: {e}")                                      time.sleep(300)                    cleanup_thread = threading.Thread(target=cleanup_task, daemon=True)         cleanup_thread.start()          def _log_memory_usage(self):         """记录内存使用情况"""         import psutil         import os                  process = psutil.Process(os.getpid())         memory_info = process.memory_info()                  cache_stats = self.cache.get_stats()                  logging.info(             f"内存使用情况 - RSS: {memory_info.rss / 1024 / 1024:.2f}MB, "             f"缓存大小: {cache_stats['size']}/{cache_stats['max_size']}, "             f"缓冲区大小: {len(self.data_buffer)}"         )
   |