thread safety catchup
authormartinko <gamato@users.sf.net>
Wed, 27 Feb 2013 11:18:18 +0000 (12:18 +0100)
committermartinko <gamato@users.sf.net>
Wed, 27 Feb 2013 11:18:18 +0000 (12:18 +0100)
python/londiste/setup.py
python/pgq/cascade/admin.py

index d7463de7030e3b59a4a862b979087bb24933a362..8a64cc8d4b55e7534a97dfbdc4cdc03de68ddc2e 100644 (file)
@@ -619,6 +619,7 @@ class LondisteSetup(CascadeAdmin):
 
     def load_extra_status(self, curs, node):
         """Fetch extra info."""
+        # must be thread-safe (!)
         CascadeAdmin.load_extra_status(self, curs, node)
         curs.execute("select * from londiste.get_table_list(%s)", [self.queue_name])
         n_ok = n_half = n_ign = 0
index c47ec02cd5a83af7f5a758de6e449d3f17c53603..a8236816884d87b5415455c615d409031872120e 100644 (file)
@@ -391,12 +391,14 @@ class CascadeAdmin(skytools.AdminScript):
 
     def load_node_status (self, name, location):
         """ Load node info & status """
+        # must be thread-safe (!)
         if not self.node_alive(name):
             node = NodeInfo(self.queue_name, None, node_name = name)
             return node
         try:
-            conn = 'look_db.%s' % name
-            db = self.get_database(conn, connstr = location, autocommit = 1)
+            db = None
+            db = skytools.connect_database (location)
+            db.set_isolation_level (skytools.I_AUTOCOMMIT)
             curs = db.cursor()
             curs.execute("select * from pgq_node.get_node_info(%s)", [self.queue_name])
             node = NodeInfo(self.queue_name, curs.fetchone())
@@ -406,7 +408,8 @@ class CascadeAdmin(skytools.AdminScript):
             msg = str(d).strip().split('\n', 1)[0].strip()
             print('Node %r failure: %s' % (name, msg))
             node = NodeInfo(self.queue_name, None, node_name = name)
-        self.close_database(conn)
+        finally:
+            if db: db.close()
         return node
 
     def cmd_node_status(self):
@@ -432,6 +435,7 @@ class CascadeAdmin(skytools.AdminScript):
 
     def load_extra_status(self, curs, node):
         """Fetch extra info."""
+        # must be thread-safe (!)
         pass
 
     #