"""
main.py
=======
The core module of my project. Controls a sabertooth dual motor with a sony DS4 remote on a RPi4.
**Program 1 – Controller Client**
Reads the controller using pyPS4Controller Library.
Changes in left joystick position are written to queue as 'vert' and 'horz'.
Joystick map is +/-32767.
Be sure to connect DS4 in bluetooth first using RPI's
**Program 2 – Motor Server**
Reads out of the queue, and sends PWM signals to the two motor driver signals
Currently all pinouts are hardcoded as GPIO 12 and 13. Duty cycle 50%.
The sabertooth switch configuation is in RC mode as follows:
| Swtch: 1|2|3|4|5|6
| Postn: D|U|U|U|U|U
In RC Mode, motor direction is a function of PWM frequency. These may/will change depending on configuration (input voltage, driver, duty cycle, gremlins). The mapping of frequencies to output voltage were collected by hand in SABERTOOTH_VOLTAGE_FREQ_MAP.ipynb, and are fit with a quadratic function.
Notes
-----
Multithreading is used to listen to the controller in a separate process.
A Queue is used to deliver the most recent command as a JSON string.
"""
[docs]def main():
"""
Main program entry.
"""
queue_camera = Queue()
queue_controller = Queue()
cam = cv2.VideoCapture(0)
pi = pigpio.pi()
process_a = Process(
target = DS4Client,
args=(queue_controller,)
)
process_b = Process(
target = SabertoothServer,
args = (queue_controller,queue_camera,pi)
)
process_c = Process(
target = CameraClient,
args = (queue_camera,cam)
)
process_c.start() #cam takes a second to load up
time.sleep(1)
process_a.start()
process_b.start()
print('ctrl+c to exit')
try:
while True:
if not (process_a.is_alive() and \
process_b.is_alive() and \
process_c.is_alive()):
raise KeyboardInterrupt
time.sleep(1)
except KeyboardInterrupt:
print("Performing Clean Exit")
pass
process_a.terminate()
process_b.terminate()
process_c.terminate()
process_a.join()
process_b.join()
process_c.join()
pi.stop()
if __name__=="__main__":
import numpy as np
from motor_control import DS4Client, SabertoothServer
from camera_utils import CameraClient
import cv2, pigpio,os,errno,json,math,time
from queue import Empty
from multiprocessing import Queue, Process
main()
## The rest of this is old code or docs converted from doxygen
#"""! @brief stereo_robot main startup script """
#
###
## @file Main Startup Script
##
## @section description_main Description
## Controls a sabertooth dual motor with a sony DS4 remote on a RPi4.
##
##
## @section notes_main Notes
## Multithreading is used to listen to the controller in a separate process.
## A Queue is used to deliver the most recent command as a JSON string.
## PROGRAM 1: Controller Client
## Reads the controller using pyPS4Controller Library.
## Changes in left joystick position are written to queue as 'vert' and 'horz'.
## Joystick map is +/-32767
## Be sure to connect DS4 in bluetooth first using RPI's
##
## PROGRAM 2: Motor Server
## Reads out of the queue, and sends PWM signals to the two motor driver signals
## Currently all pinouts are hardcoded as GPIO 12 and 13. Duty cycle 50%.
## The sabertooth switch configuation is in RC mode as follows:
## | 1 | 2 | 3 | 4 | 5 | 6 |
## |DWN| UP| UP| UP| UP|DWN|
## In RC Mode, motor direction is a function of PWM frequency. These may/will
## change depending on configuration (input voltage, driver, duty cycle, gremlins).
## The mapping of frequencies to output voltage were collected by hand in
## SABERTOOTH_VOLTAGE_FREQ_MAP.ipynb, and are fit with a quadratic function.
# FIFO = 'mypipe'
#
# try:
# os.mkfifo(FIFO)
# except OSError as oe:
# if oe.errno != errno.EEXIST:
# raise
# PROGRAM 1: Controller client
# from pyPS4Controller.controller import Controller
#
# class MyController(Controller):
# def __init__(self,pipe, **kwargs):
# Controller.__init__(self, **kwargs)
# self.control = {'vert':0,'horz':0}
# self.pipe = pipe
# self.write_controller()
#
# def write_controller(self):
# self.pipe.put(self.control)
#
#
# def on_x_press(self):
# print("Hello world")
#
# def on_x_release(self):
# print("Goodbye world")
#
# def on_L3_left(self,value):
# self.control['horz'] = value
# self.write_controller()
#
# def on_L3_right(self,value):
# self.control['horz'] = value
# self.write_controller()
#
# def on_L3_up(self,value):
# self.control['vert'] = value
# self.write_controller()
#
# def on_L3_down(self,value):
# self.control['vert'] = value
# self.write_controller()
#
# def on_L3_x_at_rest(self):
# self.control['horz'] = 0
# self.write_controller()
#
# def on_L3_y_at_rest(self):
# self.control['vert'] = 0
# self.write_controller()
#
# def client(q):
# controller = MyController(q,interface="/dev/input/js0", connecting_using_ds4drv=False)
# controller.listen(timeout=60)
## PROGRAM 2: Motor Control
#def accelMotor(current,target,step):
# if current==target:
# return int(target)
# direction = math.copysign(1,target-current)
# next_ = step*direction + current
# if direction > 0:
# if target-next_<0: #if we overshot:
# return int(target)
# else:
# return int(next_)
# else:
# if target-next_>0:
# return int(target)
# else:
# return int(next_)
#
## collected data to map frequencies to motor voltages. Tested on Kastar 12v
#FREQS = np.array([250, 300, 350, 400, 450, 500, 550])
#VOLTS = np.array([ -12.13, -2.6, 0.0, 3, 6.8, 10.9, 11.8])
#
#COEF = np.polyfit(VOLTS,FREQS,2)
#freqFun = lambda x: sum(COEF*[x**2,x,1])
#
#
#PIN_MTR_L = 13
#PIN_MTR_R = 12
#
#MTR_PWMduty = int(0.5e6) # Motor duty cycle range from 0 to 1e6
#MTR_ACCEL = 1
#
#JOYSTICK_MAX = 32767 # These are used to map the position of DS4 to a motor voltage
#MTR_VOLT_MAX = 10 # V
#
#TURN_TIME_90 = 10
#
#def motor_differential(vert,horz):
# """
# Takes a vert and horz signal (both from [-1,1]) and splits it up into a left and right
# wheel command. I.e. vert = 1, horz = 0 --> left = 1, right = 1
# vert = 1, horz = 1 --> left = 0.75, right = 0.25
# """
# L = -(vert -0.5*horz)
# R = -(vert +0.5*horz)
# L = min(max(L,-1),1)
# R = min(max(R,-1),1)
# return L,R
# def server(q):
# # Setup PIGPIO
# pi = pigpio.pi()
# pi.set_mode(PIN_MTR_L,pigpio.OUTPUT)
# pi.set_mode(PIN_MTR_R,pigpio.OUTPUT)
#
# L_freq,L_freq_target = 0,0
# R_freq,R_freq_target = 0,0
#
# while True:
# # read the controller
# try:
# message = q.get_nowait()
# # print('recieved: ', message)
# controller_state = message
#
# DS4_vert = controller_state['vert']/JOYSTICK_MAX
# DS4_horz = controller_state['horz']/JOYSTICK_MAX
#
# L,R = motor_differential(DS4_vert,DS4_horz)
#
# L_freq_target = freqFun(L*MTR_VOLT_MAX)
# R_freq_target = freqFun(R*MTR_VOLT_MAX)
#
#
#
# except Empty:
# pass
#
# L_freq = accelMotor(L_freq,L_freq_target,MTR_ACCEL)
# R_freq = accelMotor(R_freq,R_freq_target,MTR_ACCEL)
# # print(f'left: L: {L*MTR_VOLT_MAX} freq:{L_freq}')
# # print(f'right: R: {R*MTR_VOLT_MAX} freq:{R_freq}')
#
# pi.hardware_PWM(PIN_MTR_L,L_freq,MTR_PWMduty)
# pi.hardware_PWM(PIN_MTR_R,R_freq,MTR_PWMduty)
# process.start()
# process.join()
# server(queue)
# you can start listening before controller is paired, as long as you pair it within the timeout window