forked from ctrl-labs/cringbuffer
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_ringbuffer2.py
More file actions
37 lines (28 loc) · 1.1 KB
/
test_ringbuffer2.py
File metadata and controls
37 lines (28 loc) · 1.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import ringbuffer
import ctypes
class TStruct(ctypes.Structure):
_fields_ = (('i', ctypes.c_int32), ('f', ctypes.c_float))
def test_ringbuffer_write_multiple():
ring = ringbuffer.RingBuffer(c_type=TStruct, slot_count=10)
ring.new_writer()
reader = ring.new_reader()
data = (TStruct * 8)()
for i in range(0, 8):
data[i] = TStruct(i=i)
ring.try_write_multiple(data)
expected_data_list = [{'i': index} for index in range(0, 6)]
received_data = ring.try_read(reader, length=6)
for i, expected_data in enumerate(expected_data_list):
for k, v in expected_data.items():
value = getattr(received_data[i], k)
assert value == v
data = (TStruct * 6)()
for i in range(8, 14):
data[i - 8] = TStruct(i=i)
ring.try_write_multiple(data)
expected_data_list = [{'i': index} for index in range(6, 14)]
received_data = ring.try_read(reader, length=8)
for i, expected_data in enumerate(expected_data_list):
for k, v in expected_data.items():
value = getattr(received_data[i], k)
assert value == v