Skip to content

Commit 8a3f1e5

Browse files
committed
Fix inventory flushing
- it's a combo of handling a tuple and pickling of raw data, and is triggered under rare occasions
1 parent a9cc560 commit 8a3f1e5

4 files changed

Lines changed: 183 additions & 3 deletions

File tree

src/helper_startup.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,11 @@
3232

3333
logger = logging.getLogger('default')
3434

35+
#: The latest version of the keys.dat settings schema. Bump this
36+
#: when adding a new migration step in :func:`updateConfig` or
37+
#: :class:`class_sqlThread.sqlThread`.
38+
LATEST_SETTINGS_VERSION = 10
39+
3540
# The user may de-select Portable Mode in the settings if they want
3641
# the config files to stay in the application data folder.
3742
StoreConfigFilesInSameDirectoryAsProgramByDefault = False
@@ -72,7 +77,9 @@ def loadConfig():
7277
# no config file (or it cannot be accessed). Create config file.
7378
# config.add_section('bitmessagesettings')
7479
config.read()
75-
config.set('bitmessagesettings', 'settingsversion', '10')
80+
config.set(
81+
'bitmessagesettings', 'settingsversion',
82+
str(LATEST_SETTINGS_VERSION))
7683
if 'linux' in sys.platform:
7784
config.set('bitmessagesettings', 'minimizetotray', 'false')
7885
# This isn't implimented yet and when True on

src/storage/sqlite.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,12 @@ def flush(self):
109109
for objectHash, value in self._inventory.items():
110110
sql.execute(
111111
'INSERT INTO inventory VALUES (?, ?, ?, ?, ?, ?)',
112-
sqlite3.Binary(objectHash), *value)
112+
sqlite3.Binary(objectHash),
113+
value.type,
114+
value.stream,
115+
sqlite3.Binary(bytes(value.payload)),
116+
value.expires,
117+
sqlite3.Binary(bytes(value.tag)))
113118
self._inventory.clear()
114119

115120
def clean(self):

src/tests/test_config_process.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import os
66
import tempfile
77
from pybitmessage.bmconfigparser import config
8+
from pybitmessage.helper_startup import LATEST_SETTINGS_VERSION
89
from .test_process import TestProcessProto
910
from .common import skip_python3
1011

@@ -22,7 +23,8 @@ def test_config_defaults(self):
2223
config.read(os.path.join(self.home, 'keys.dat'))
2324

2425
self.assertEqual(config.safeGetInt(
25-
'bitmessagesettings', 'settingsversion'), 10)
26+
'bitmessagesettings', 'settingsversion'),
27+
LATEST_SETTINGS_VERSION)
2628
self.assertEqual(config.safeGetInt(
2729
'bitmessagesettings', 'port'), 8444)
2830
# don't connect

src/tests/test_inventory_flush.py

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
"""Tests for SqliteInventory.flush()"""
2+
# pylint: disable=protected-access,wrong-import-order,wrong-import-position
3+
# pylint: disable=import-outside-toplevel
4+
5+
import os
6+
import struct
7+
import tempfile
8+
import threading
9+
import time
10+
11+
from .common import skip_python3
12+
from .partial import TestPartialRun
13+
14+
skip_python3()
15+
16+
17+
class TestInventoryFlush(TestPartialRun):
18+
"""
19+
Integration test: exercises flush() end-to-end with the real sqlThread
20+
consumer running, so that type errors in parameter binding surface here
21+
rather than silently killing a production thread.
22+
"""
23+
24+
@classmethod
25+
def setUpClass(cls):
26+
os.environ['BITMESSAGE_HOME'] = tempfile.gettempdir()
27+
super(TestInventoryFlush, cls).setUpClass()
28+
29+
import helper_sql
30+
from bmconfigparser import config, config_ready
31+
from class_sqlThread import sqlThread
32+
from helper_startup import LATEST_SETTINGS_VERSION
33+
from storage.sqlite import SqliteInventory
34+
35+
cls._sqlStoredProcedure = staticmethod(helper_sql.sqlStoredProcedure)
36+
37+
# sqlThread.run() waits on config_ready and then reads
38+
# settingsversion; normally helper_startup.loadConfig() handles
39+
# both, but TestPartialRun only calls config.read() which loads
40+
# default.ini (no settingsversion). Set the minimum the
41+
# sqlThread needs so it can initialise the database.
42+
if not config.has_option(
43+
'bitmessagesettings', 'settingsversion'):
44+
config.set(
45+
'bitmessagesettings', 'settingsversion',
46+
str(LATEST_SETTINGS_VERSION))
47+
config_ready.set()
48+
49+
# test_api_thread replaces helper_sql.sql_ready with a mock
50+
# that only has wait(); restore a real Event so sqlThread can
51+
# call .set() on it. In Python 2 threading.Event is a factory
52+
# function, not a class, so we duck-type the check.
53+
cls._original_sql_ready = helper_sql.sql_ready
54+
if not hasattr(helper_sql.sql_ready, 'set'):
55+
helper_sql.sql_ready = threading.Event()
56+
57+
sql_lookup = sqlThread()
58+
sql_lookup.daemon = True
59+
sql_lookup.start()
60+
helper_sql.sql_ready.wait()
61+
cls.inventory = SqliteInventory()
62+
63+
@classmethod
64+
def tearDownClass(cls):
65+
import helper_sql
66+
from bmconfigparser import config_ready
67+
68+
cls._sqlStoredProcedure('exit')
69+
for thread in threading.enumerate():
70+
if thread.name == "SQL":
71+
thread.join(timeout=10)
72+
helper_sql.sql_ready = cls._original_sql_ready
73+
# Reset config to default.ini so added settingsversion does
74+
# not leak into subsequent tests. Also clear config_ready
75+
# since it is a one-shot event set by loadConfig().
76+
cls.config.read()
77+
config_ready.clear()
78+
super(TestInventoryFlush, cls).tearDownClass()
79+
80+
# -- helpers ----------------------------------------------------------
81+
82+
@staticmethod
83+
def _make_hash(seed):
84+
"""Return a 32-byte hash derived from *seed*."""
85+
return (b'\x00' * 31 + bytes([seed & 0xFF]))[-32:]
86+
87+
def _flush_and_check(self, obj_hash):
88+
"""
89+
Flush the inventory to the database, clear the _objects lookup
90+
cache so that __contains__ is forced to hit sqlite, then verify
91+
the hash is found via the normal inventory API.
92+
"""
93+
self.inventory.flush()
94+
self.inventory._objects.clear()
95+
self.assertIn(obj_hash, self.inventory)
96+
97+
# -- test cases -------------------------------------------------------
98+
99+
def test_flush_with_bytes_payload(self):
100+
"""Baseline: payload and tag are plain bytes."""
101+
h = self._make_hash(1)
102+
self.inventory[h] = (
103+
2, 1, b'\x80\x01' + os.urandom(64),
104+
int(time.time()) + 3600, b'\xff' * 32)
105+
self._flush_and_check(h)
106+
107+
def test_flush_with_memoryview_payload(self):
108+
"""
109+
Reproduce the production crash: payload and tag as memoryview
110+
cause 'Error binding parameter 3 - probably unsupported type.'
111+
"""
112+
h = self._make_hash(2)
113+
self.inventory[h] = (
114+
2, 1, memoryview(b'\x80\x02' + os.urandom(64)),
115+
int(time.time()) + 3600, memoryview(b'\xee' * 32))
116+
self._flush_and_check(h)
117+
118+
def test_flush_with_bytearray_payload(self):
119+
"""bytearray is another bytes-like type that could trip sqlite3."""
120+
h = self._make_hash(3)
121+
self.inventory[h] = (
122+
2, 1, bytearray(b'\x80\x03' + os.urandom(64)),
123+
int(time.time()) + 3600, bytearray(b'\xdd' * 32))
124+
self._flush_and_check(h)
125+
126+
def test_flush_with_empty_tag(self):
127+
"""Empty tag (b'') must not break the INSERT."""
128+
h = self._make_hash(4)
129+
self.inventory[h] = (
130+
2, 1, b'\x80\x04' + os.urandom(64),
131+
int(time.time()) + 3600, b'')
132+
self._flush_and_check(h)
133+
134+
# pylint: disable=redefined-variable-type
135+
def test_flush_multiple_mixed_types(self):
136+
"""Flush a batch of items with mixed blob types."""
137+
count = 20
138+
hashes = [self._make_hash(0x10 + i) for i in range(count)]
139+
expires = int(time.time()) + 3600
140+
141+
for i, h in enumerate(hashes):
142+
payload = struct.pack('>I', i) + os.urandom(60)
143+
tag = struct.pack('>I', i) + b'\x00' * 28
144+
if i % 3 == 0:
145+
payload = memoryview(payload)
146+
tag = memoryview(tag)
147+
elif i % 3 == 1:
148+
payload = bytearray(payload)
149+
tag = bytearray(tag)
150+
self.inventory[h] = (2, 1, payload, expires, tag)
151+
152+
self.inventory.flush()
153+
self.inventory._objects.clear()
154+
155+
for i, h in enumerate(hashes):
156+
self.assertIn(
157+
h, self.inventory,
158+
"Item {} missing after batch flush".format(i))
159+
160+
def test_flush_clears_memory_cache(self):
161+
"""After flush the in-memory _inventory dict must be empty."""
162+
h = self._make_hash(0xF0)
163+
self.inventory[h] = (
164+
2, 1, b'\x00' * 32, int(time.time()) + 3600, b'')
165+
self.inventory.flush()
166+
self.assertEqual(len(self.inventory._inventory), 0)

0 commit comments

Comments
 (0)