|
|
|
#
|
|
|
|
# (c) 2017 Aleph Objects, Inc.
|
|
|
|
#
|
|
|
|
# The code in this page is free software: you can
|
|
|
|
# redistribute it and/or modify it under the terms of the GNU
|
|
|
|
# General Public License (GNU GPL) as published by the Free Software
|
|
|
|
# Foundation, either version 3 of the License, or (at your option)
|
|
|
|
# any later version. The code is distributed WITHOUT ANY WARRANTY;
|
|
|
|
# without even the implied warranty of MERCHANTABILITY or FITNESS
|
|
|
|
# FOR A PARTICULAR PURPOSE. See the GNU GPL for more details.
|
|
|
|
#
|
|
|
|
|
|
|
|
import random
|
|
|
|
import string
|
|
|
|
|
|
|
|
class NoisySerialConnection:
|
|
|
|
"""Wrapper class which injects character noise into data of
|
|
|
|
a serial connection for testing Marlin's error correction"""
|
|
|
|
def __init__(self, serial):
|
|
|
|
self.serial = serial
|
|
|
|
self.readErrorRate = 0
|
|
|
|
self.writeErrorRate = 0
|
|
|
|
|
|
|
|
def _corruptData(self, data):
|
|
|
|
"""Introduces a single character error on a string"""
|
|
|
|
badChar = random.choice(string.ascii_letters)
|
|
|
|
badChar = badChar if isinstance(data, str) else badChar.encode()
|
|
|
|
if len(data) == 0:
|
|
|
|
return data
|
|
|
|
if len(data) == 1:
|
|
|
|
return badChar
|
|
|
|
charToCorrupt = random.randint(0, len(data) - 1)
|
|
|
|
return data[:charToCorrupt] + badChar + data[charToCorrupt+1:]
|
|
|
|
|
|
|
|
def write(self, data):
|
|
|
|
if(random.random() < self.writeErrorRate):
|
|
|
|
data = self._corruptData(data)
|
|
|
|
self.serial.write(data)
|
|
|
|
|
|
|
|
def readline(self):
|
|
|
|
data = self.serial.readline()
|
|
|
|
if(random.random() < self.readErrorRate):
|
|
|
|
data = self._corruptData(data)
|
|
|
|
return data
|
|
|
|
|
|
|
|
@property
|
|
|
|
def in_waiting(self):
|
|
|
|
return self.serial.in_waiting
|
|
|
|
|
|
|
|
def flush(self):
|
|
|
|
self.serial.flush()
|
|
|
|
|
|
|
|
def close(self):
|
|
|
|
self.serial.close()
|
|
|
|
|
|
|
|
def setWriteErrorRate(self, badWrites, totalWrites):
|
|
|
|
"""Inserts a single character error into every badWrites out of totalWrites"""
|
|
|
|
self.writeErrorRate = float(badWrites)/float(totalWrites)
|
|
|
|
|
|
|
|
def setReadErrorRate(self, badReads, totalReads):
|
|
|
|
"""Inserts a single character error into every badReads out of totalReads"""
|
|
|
|
self.readErrorRate = float(badReads)/float(totalReads)
|