Skip to content

Commit c232085

Browse files
committed
Fix inventory flushing
- it's a combo of handling a tuple and pickling of raw data, and is triggered under rare occasions
1 parent a9cc560 commit c232085

4 files changed

Lines changed: 175 additions & 3 deletions

File tree

src/helper_startup.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,11 @@
3232

3333
logger = logging.getLogger('default')
3434

35+
#: The latest version of the keys.dat settings schema. Bump this
36+
#: when adding a new migration step in :func:`updateConfig` or
37+
#: :class:`class_sqlThread.sqlThread`.
38+
LATEST_SETTINGS_VERSION = 10
39+
3540
# The user may de-select Portable Mode in the settings if they want
3641
# the config files to stay in the application data folder.
3742
StoreConfigFilesInSameDirectoryAsProgramByDefault = False
@@ -72,7 +77,9 @@ def loadConfig():
7277
# no config file (or it cannot be accessed). Create config file.
7378
# config.add_section('bitmessagesettings')
7479
config.read()
75-
config.set('bitmessagesettings', 'settingsversion', '10')
80+
config.set(
81+
'bitmessagesettings', 'settingsversion',
82+
str(LATEST_SETTINGS_VERSION))
7683
if 'linux' in sys.platform:
7784
config.set('bitmessagesettings', 'minimizetotray', 'false')
7885
# This isn't implimented yet and when True on

src/storage/sqlite.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,12 @@ def flush(self):
109109
for objectHash, value in self._inventory.items():
110110
sql.execute(
111111
'INSERT INTO inventory VALUES (?, ?, ?, ?, ?, ?)',
112-
sqlite3.Binary(objectHash), *value)
112+
sqlite3.Binary(objectHash),
113+
value.type,
114+
value.stream,
115+
sqlite3.Binary(value.payload),
116+
value.expires,
117+
sqlite3.Binary(value.tag))
113118
self._inventory.clear()
114119

115120
def clean(self):

src/tests/test_config_process.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import os
66
import tempfile
77
from pybitmessage.bmconfigparser import config
8+
from pybitmessage.helper_startup import LATEST_SETTINGS_VERSION
89
from .test_process import TestProcessProto
910
from .common import skip_python3
1011

@@ -22,7 +23,8 @@ def test_config_defaults(self):
2223
config.read(os.path.join(self.home, 'keys.dat'))
2324

2425
self.assertEqual(config.safeGetInt(
25-
'bitmessagesettings', 'settingsversion'), 10)
26+
'bitmessagesettings', 'settingsversion'),
27+
LATEST_SETTINGS_VERSION)
2628
self.assertEqual(config.safeGetInt(
2729
'bitmessagesettings', 'port'), 8444)
2830
# don't connect

src/tests/test_inventory_flush.py

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
"""Tests for SqliteInventory.flush()"""
2+
# pylint: disable=protected-access,wrong-import-order,wrong-import-position
3+
# pylint: disable=import-outside-toplevel
4+
5+
import os
6+
import struct
7+
import tempfile
8+
import threading
9+
import time
10+
11+
from .common import skip_python3
12+
from .partial import TestPartialRun
13+
14+
skip_python3()
15+
16+
17+
class TestInventoryFlush(TestPartialRun):
18+
"""
19+
Integration test: exercises flush() end-to-end with the real sqlThread
20+
consumer running, so that type errors in parameter binding surface here
21+
rather than silently killing a production thread.
22+
"""
23+
24+
@classmethod
25+
def setUpClass(cls):
26+
os.environ['BITMESSAGE_HOME'] = tempfile.gettempdir()
27+
super(TestInventoryFlush, cls).setUpClass()
28+
29+
import helper_sql
30+
from bmconfigparser import config, config_ready
31+
from class_sqlThread import sqlThread
32+
from helper_startup import LATEST_SETTINGS_VERSION
33+
from storage.sqlite import SqliteInventory
34+
35+
cls._sqlStoredProcedure = staticmethod(helper_sql.sqlStoredProcedure)
36+
37+
# sqlThread.run() waits on config_ready and then reads
38+
# settingsversion; normally helper_startup.loadConfig() handles
39+
# both, but TestPartialRun only calls config.read() which loads
40+
# default.ini (no settingsversion). Set the minimum the
41+
# sqlThread needs so it can initialise the database.
42+
if not config.has_option(
43+
'bitmessagesettings', 'settingsversion'):
44+
config.set(
45+
'bitmessagesettings', 'settingsversion',
46+
str(LATEST_SETTINGS_VERSION))
47+
config_ready.set()
48+
49+
# test_api_thread replaces helper_sql.sql_ready with a mock
50+
# that only has wait(); restore a real Event so sqlThread can
51+
# call .set() on it.
52+
cls._original_sql_ready = helper_sql.sql_ready
53+
if not isinstance(helper_sql.sql_ready, threading.Event):
54+
helper_sql.sql_ready = threading.Event()
55+
56+
sql_lookup = sqlThread()
57+
sql_lookup.daemon = True
58+
sql_lookup.start()
59+
helper_sql.sql_ready.wait()
60+
cls.inventory = SqliteInventory()
61+
62+
@classmethod
63+
def tearDownClass(cls):
64+
import helper_sql
65+
cls._sqlStoredProcedure('exit')
66+
for thread in threading.enumerate():
67+
if thread.name == "SQL":
68+
thread.join(timeout=10)
69+
helper_sql.sql_ready = cls._original_sql_ready
70+
super(TestInventoryFlush, cls).tearDownClass()
71+
72+
# -- helpers ----------------------------------------------------------
73+
74+
@staticmethod
75+
def _make_hash(seed):
76+
"""Return a 32-byte hash derived from *seed*."""
77+
return (b'\x00' * 31 + bytes([seed & 0xFF]))[-32:]
78+
79+
def _flush_and_check(self, obj_hash):
80+
"""
81+
Flush the inventory to the database, clear the _objects lookup
82+
cache so that __contains__ is forced to hit sqlite, then verify
83+
the hash is found via the normal inventory API.
84+
"""
85+
self.inventory.flush()
86+
self.inventory._objects.clear()
87+
self.assertIn(obj_hash, self.inventory)
88+
89+
# -- test cases -------------------------------------------------------
90+
91+
def test_flush_with_bytes_payload(self):
92+
"""Baseline: payload and tag are plain bytes."""
93+
h = self._make_hash(1)
94+
self.inventory[h] = (
95+
2, 1, b'\x80\x01' + os.urandom(64),
96+
int(time.time()) + 3600, b'\xff' * 32)
97+
self._flush_and_check(h)
98+
99+
def test_flush_with_memoryview_payload(self):
100+
"""
101+
Reproduce the production crash: payload and tag as memoryview
102+
cause 'Error binding parameter 3 - probably unsupported type.'
103+
"""
104+
h = self._make_hash(2)
105+
self.inventory[h] = (
106+
2, 1, memoryview(b'\x80\x02' + os.urandom(64)),
107+
int(time.time()) + 3600, memoryview(b'\xee' * 32))
108+
self._flush_and_check(h)
109+
110+
def test_flush_with_bytearray_payload(self):
111+
"""bytearray is another bytes-like type that could trip sqlite3."""
112+
h = self._make_hash(3)
113+
self.inventory[h] = (
114+
2, 1, bytearray(b'\x80\x03' + os.urandom(64)),
115+
int(time.time()) + 3600, bytearray(b'\xdd' * 32))
116+
self._flush_and_check(h)
117+
118+
def test_flush_with_empty_tag(self):
119+
"""Empty tag (b'') must not break the INSERT."""
120+
h = self._make_hash(4)
121+
self.inventory[h] = (
122+
2, 1, b'\x80\x04' + os.urandom(64),
123+
int(time.time()) + 3600, b'')
124+
self._flush_and_check(h)
125+
126+
# pylint: disable=redefined-variable-type
127+
def test_flush_multiple_mixed_types(self):
128+
"""Flush a batch of items with mixed blob types."""
129+
count = 20
130+
hashes = [self._make_hash(0x10 + i) for i in range(count)]
131+
expires = int(time.time()) + 3600
132+
133+
for i, h in enumerate(hashes):
134+
payload = struct.pack('>I', i) + os.urandom(60)
135+
tag = struct.pack('>I', i) + b'\x00' * 28
136+
if i % 3 == 0:
137+
payload = memoryview(payload)
138+
tag = memoryview(tag)
139+
elif i % 3 == 1:
140+
payload = bytearray(payload)
141+
tag = bytearray(tag)
142+
self.inventory[h] = (2, 1, payload, expires, tag)
143+
144+
self.inventory.flush()
145+
self.inventory._objects.clear()
146+
147+
for i, h in enumerate(hashes):
148+
self.assertIn(
149+
h, self.inventory,
150+
"Item {} missing after batch flush".format(i))
151+
152+
def test_flush_clears_memory_cache(self):
153+
"""After flush the in-memory _inventory dict must be empty."""
154+
h = self._make_hash(0xF0)
155+
self.inventory[h] = (
156+
2, 1, b'\x00' * 32, int(time.time()) + 3600, b'')
157+
self.inventory.flush()
158+
self.assertEqual(len(self.inventory._inventory), 0)

0 commit comments

Comments
 (0)