Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import streamlit.components.v1 as components | |
| import threading | |
| import time | |
| from queue import Queue | |
| import mido # For MIDI I/O | |
| from mido import Message | |
| from pyo import Server, Sine, SfPlayer, Mixer, Notein, MidiAdsr | |
| # ========================= | |
| # 1) AUDIO ENGINE (pyo) | |
| # ========================= | |
| # We’ll create a pyo Server at module load. Adjust sample rate, buffers, etc. as needed. | |
| # In many environments, pyo wants to open an audio stream. This might conflict with | |
| # Streamlit’s runtime if it's not set up for real-time audio. | |
| # We'll do a basic attempt: | |
| AUDIO_SR = 44100 | |
| s = Server(sr=AUDIO_SR, nchnls=2, buffersize=1024, duplex=1).boot() | |
| s.start() | |
| # We'll keep a global dictionary of active pyo objects for "notes" to allow polyphony. | |
| active_oscillators = {} | |
| # A simple function to generate or retrieve an oscillator for a given note. | |
| def note_on(note, velocity=100, synth_type='sine'): | |
| """ | |
| Trigger or re-trigger a note with pyo-based oscillator or sample player. | |
| """ | |
| # Example approach: a simple sine wave whose frequency is set by MIDI note number | |
| freq = mido.midifrequencies[note] # Mido has a built-in freq table | |
| amp = velocity / 127.0 * 0.3 # scale amplitude by velocity, 0.3 is arbitrary | |
| if note not in active_oscillators: | |
| # Create a new oscillator for that note | |
| if synth_type == 'sine': | |
| osc = Sine(freq=freq, mul=amp).out() | |
| else: | |
| # For demonstration, we can also do a sample-based approach if you want: | |
| # osc = SfPlayer("path_to_some_sample.wav", speed=1, loop=False, mul=amp).out() | |
| osc = Sine(freq=freq, mul=amp).out() | |
| active_oscillators[note] = osc | |
| else: | |
| # If the note is already on, you could re-trigger or update amplitude, etc. | |
| osc = active_oscillators[note] | |
| osc.setFreq(freq) | |
| osc.mul = amp | |
| def note_off(note): | |
| """ | |
| Stop a note by turning off or freeing the oscillator. | |
| """ | |
| if note in active_oscillators: | |
| osc = active_oscillators[note] | |
| osc.stop() # immediately stop | |
| del active_oscillators[note] | |
| # If you want a more advanced poly-synth approach, you might consider `Notein`, `MidiAdsr`, etc. | |
| # ========================= | |
| # 2) DRUM / LOOPS | |
| # ========================= | |
| # For drum pads, we can load multiple short samples. | |
| # We'll store them in a dictionary to trigger by index or note number: | |
| drum_samples = { | |
| 0: "samples/kick.wav", | |
| 1: "samples/snare.wav", | |
| 2: "samples/hihat.wav", | |
| 3: "samples/clap.wav", | |
| # ... | |
| } | |
| def drum_trigger(index, velocity=100): | |
| """Simple function to trigger a drum sample from a dictionary of sample files.""" | |
| if index not in drum_samples: | |
| return | |
| vol = velocity / 127.0 * 0.8 | |
| # Create a one-shot player | |
| sfp = SfPlayer(drum_samples[index], loop=False, mul=vol).out() | |
| # ========================= | |
| # 3) ARPEGGIATOR EXAMPLE | |
| # ========================= | |
| class Arpeggiator: | |
| def __init__(self, bpm=120): | |
| self.bpm = bpm | |
| self.notes_held = set() | |
| self.running = False | |
| self.thread = None | |
| def start(self): | |
| if self.running: | |
| return | |
| self.running = True | |
| self.thread = threading.Thread(target=self.run, daemon=True) | |
| self.thread.start() | |
| def stop(self): | |
| self.running = False | |
| if self.thread: | |
| self.thread.join() | |
| def run(self): | |
| # Very simple up pattern | |
| delay = 60.0 / self.bpm / 2.0 # half of a quarter note => eighth notes | |
| idx = 0 | |
| while self.running: | |
| if self.notes_held: | |
| sorted_notes = sorted(list(self.notes_held)) | |
| note = sorted_notes[idx % len(sorted_notes)] | |
| note_on(note, 100) # arpeggiator triggers a note_on | |
| time.sleep(delay * 0.5) | |
| note_off(note) # note_off after half the step | |
| time.sleep(delay * 0.5) | |
| idx += 1 | |
| else: | |
| time.sleep(0.01) | |
| def note_on(self, note): | |
| self.notes_held.add(note) | |
| def note_off(self, note): | |
| if note in self.notes_held: | |
| self.notes_held.remove(note) | |
| # ========================= | |
| # 4) HTML + JS | |
| # ========================= | |
| def get_keyboard_html(): | |
| """ | |
| Returns an HTML snippet for a 5-octave Qwerty-Hancock keyboard | |
| from 'C3' upward. | |
| """ | |
| return """ | |
| <!DOCTYPE html> | |
| <html> | |
| <head> | |
| <style> | |
| #keyboard { | |
| margin: 0 auto; | |
| } | |
| .qwerty-hancock-wrapper { | |
| width: 900px; /* Adjust to taste */ | |
| margin: 0 auto; | |
| } | |
| </style> | |
| <script src="https://cdnjs.cloudflare.com/ajax/libs/qwerty-hancock/0.10.0/qwerty-hancock.min.js"></script> | |
| </head> | |
| <body> | |
| <div class="qwerty-hancock-wrapper"> | |
| <div id="keyboard"></div> | |
| </div> | |
| <script> | |
| // 5 octaves from C3 to (C3 + 5 octaves => C8 is beyond 5, but let's do ~ C7). | |
| const keyboard = new QwertyHancock({ | |
| id: 'keyboard', | |
| width: 900, | |
| height: 150, | |
| octaves: 5, | |
| startNote: 'C3', | |
| whiteKeyColour: 'white', | |
| blackKeyColour: '#444', | |
| activeColour: '#FF6961' | |
| }); | |
| // Build a note->MIDI dictionary. We'll do some approximate mappings: | |
| // C3=48, C#3=49, ... up to B7 or so. Expand as needed. | |
| // We'll hardcode for demonstration, or generate dynamically. | |
| const noteToMidi = { | |
| 'C3':48,'C#3':49,'D3':50,'D#3':51,'E3':52,'F3':53,'F#3':54,'G3':55,'G#3':56,'A3':57,'A#3':58,'B3':59, | |
| 'C4':60,'C#4':61,'D4':62,'D#4':63,'E4':64,'F4':65,'F#4':66,'G4':67,'G#4':68,'A4':69,'A#4':70,'B4':71, | |
| 'C5':72,'C#5':73,'D5':74,'D#5':75,'E5':76,'F5':77,'F#5':78,'G5':79,'G#5':80,'A5':81,'A#5':82,'B5':83, | |
| 'C6':84,'C#6':85,'D6':86,'D#6':87,'E6':88,'F6':89,'F#6':90,'G6':91,'G#6':92,'A6':93,'A#6':94,'B6':95, | |
| 'C7':96,'C#7':97,'D7':98,'D#7':99,'E7':100,'F7':101,'F#7':102,'G7':103,'G#7':104,'A7':105,'A#7':106,'B7':107 | |
| }; | |
| keyboard.keyDown = function (note, freq) { | |
| const midiNote = noteToMidi[note]; | |
| if(midiNote !== undefined){ | |
| window.parent.postMessage({ | |
| type: 'midiEvent', | |
| data: { | |
| type: 'noteOn', | |
| note: midiNote, | |
| velocity: 100 | |
| } | |
| }, '*'); | |
| } | |
| }; | |
| keyboard.keyUp = function (note, freq) { | |
| const midiNote = noteToMidi[note]; | |
| if(midiNote !== undefined){ | |
| window.parent.postMessage({ | |
| type: 'midiEvent', | |
| data: { | |
| type: 'noteOff', | |
| note: midiNote, | |
| velocity: 0 | |
| } | |
| }, '*'); | |
| } | |
| }; | |
| </script> | |
| </body> | |
| </html> | |
| """ | |
| def get_drum_pads_html(): | |
| """ | |
| Returns an HTML snippet for a 4x4 (16) grid of drum pads. | |
| Each pad sends a 'drumTrigger' event with index 0..15. | |
| """ | |
| return """ | |
| <!DOCTYPE html> | |
| <html> | |
| <head> | |
| <style> | |
| .drum-grid { | |
| display: grid; | |
| grid-template-columns: repeat(4, 80px); | |
| grid-gap: 10px; | |
| width: max-content; | |
| margin: 0 auto; | |
| } | |
| .drum-pad { | |
| width: 80px; | |
| height: 80px; | |
| background-color: #666; | |
| display: flex; | |
| align-items: center; | |
| justify-content: center; | |
| color: #fff; | |
| font-weight: bold; | |
| font-size: 1.2em; | |
| cursor: pointer; | |
| user-select: none; | |
| border-radius: 8px; | |
| } | |
| .drum-pad:active { | |
| background-color: #999; | |
| } | |
| </style> | |
| </head> | |
| <body> | |
| <div class="drum-grid"> | |
| <div class="drum-pad" data-pad="0">Pad 1</div> | |
| <div class="drum-pad" data-pad="1">Pad 2</div> | |
| <div class="drum-pad" data-pad="2">Pad 3</div> | |
| <div class="drum-pad" data-pad="3">Pad 4</div> | |
| <div class="drum-pad" data-pad="4">Pad 5</div> | |
| <div class="drum-pad" data-pad="5">Pad 6</div> | |
| <div class="drum-pad" data-pad="6">Pad 7</div> | |
| <div class="drum-pad" data-pad="7">Pad 8</div> | |
| <div class="drum-pad" data-pad="8">Pad 9</div> | |
| <div class="drum-pad" data-pad="9">Pad10</div> | |
| <div class="drum-pad" data-pad="10">Pad11</div> | |
| <div class="drum-pad" data-pad="11">Pad12</div> | |
| <div class="drum-pad" data-pad="12">Pad13</div> | |
| <div class="drum-pad" data-pad="13">Pad14</div> | |
| <div class="drum-pad" data-pad="14">Pad15</div> | |
| <div class="drum-pad" data-pad="15">Pad16</div> | |
| </div> | |
| <script> | |
| document.querySelectorAll('.drum-pad').forEach(pad => { | |
| pad.addEventListener('mousedown', () => { | |
| let padIndex = parseInt(pad.getAttribute('data-pad')); | |
| window.parent.postMessage({ | |
| type: 'drumTrigger', | |
| data: { | |
| padIndex: padIndex, | |
| velocity: 100 | |
| } | |
| }, '*'); | |
| }); | |
| }); | |
| </script> | |
| </body> | |
| </html> | |
| """ | |
| # ========================= | |
| # 5) STREAMLIT APP | |
| # ========================= | |
| def main(): | |
| st.title("Python Synth with 5-Octave Keyboard + Drum Pads (pyo-based)") | |
| # Arpeggiator in session state | |
| if 'arpeggiator' not in st.session_state: | |
| st.session_state.arpeggiator = Arpeggiator(bpm=120) | |
| # BPM slider | |
| st.session_state.arpeggiator.bpm = st.slider("Arpeggiator BPM", 60, 240, 120) | |
| use_arp = st.checkbox("Enable Arpeggiator", value=False) | |
| if use_arp: | |
| st.session_state.arpeggiator.start() | |
| else: | |
| st.session_state.arpeggiator.stop() | |
| # MIDI I/O | |
| st.subheader("MIDI Ports") | |
| in_ports = mido.get_input_names() | |
| out_ports = mido.get_output_names() | |
| input_choice = st.selectbox("MIDI Input", ["None"] + in_ports) | |
| output_choice = st.selectbox("MIDI Output", ["None"] + out_ports) | |
| # Manage opening/closing | |
| if 'midi_in' not in st.session_state: | |
| st.session_state.midi_in = None | |
| if 'midi_out' not in st.session_state: | |
| st.session_state.midi_out = None | |
| # Callback for incoming hardware MIDI | |
| def midi_in_callback(msg): | |
| if msg.type in ['note_on', 'note_off']: | |
| # Convert to dictionary | |
| event = { | |
| 'type': 'noteOn' if msg.type == 'note_on' else 'noteOff', | |
| 'note': msg.note, | |
| 'velocity': msg.velocity | |
| } | |
| st.session_state.incoming_events.put(event) | |
| def open_input(port): | |
| if port == "None": | |
| return None | |
| return mido.open_input(port, callback=midi_in_callback) | |
| def open_output(port): | |
| if port == "None": | |
| return None | |
| return mido.open_output(port) | |
| # Re-open if changed | |
| if st.session_state.midi_in and st.session_state.midi_in.name != input_choice: | |
| st.session_state.midi_in.close() | |
| st.session_state.midi_in = open_input(input_choice) | |
| elif not st.session_state.midi_in and input_choice != "None": | |
| st.session_state.midi_in = open_input(input_choice) | |
| if st.session_state.midi_out and st.session_state.midi_out.name != output_choice: | |
| st.session_state.midi_out.close() | |
| st.session_state.midi_out = open_output(output_choice) | |
| elif not st.session_state.midi_out and output_choice != "None": | |
| st.session_state.midi_out = open_output(output_choice) | |
| st.write("Press keys on hardware (if connected) or use the on-screen UI below:") | |
| # On-screen 5-octave keyboard | |
| st.subheader("5-Octave Keyboard") | |
| components.html(get_keyboard_html(), height=220) | |
| # Drum pads | |
| st.subheader("Drum Pads (16)") | |
| components.html(get_drum_pads_html(), height=220) | |
| # Hidden script to route postMessage -> Streamlit | |
| components.html(""" | |
| <script> | |
| window.addEventListener('message', function(e) { | |
| if (e.data.type === 'midiEvent') { | |
| // forward to parent's postMessage | |
| window.parent.postMessage({ | |
| type: 'streamlit:message', | |
| data: { | |
| type: 'midi_event', | |
| event: e.data.data | |
| } | |
| }, '*'); | |
| } else if (e.data.type === 'drumTrigger') { | |
| window.parent.postMessage({ | |
| type: 'streamlit:message', | |
| data: { | |
| type: 'drum_event', | |
| event: e.data.data | |
| } | |
| }, '*'); | |
| } | |
| }); | |
| </script> | |
| """, height=0) | |
| # We'll store inbound events in a queue | |
| if 'incoming_events' not in st.session_state: | |
| st.session_state.incoming_events = Queue() | |
| # A small debug output | |
| debug_area = st.empty() | |
| # We define a function to dispatch events to pyo and optionally to MIDI out | |
| def dispatch_event(event): | |
| etype = event['type'] | |
| if etype in ('noteOn', 'noteOff'): | |
| note = event['note'] | |
| vel = event.get('velocity', 100) | |
| # Arp logic or direct | |
| if use_arp: | |
| # Send to arpeggiator | |
| if etype == 'noteOn': | |
| st.session_state.arpeggiator.note_on(note) | |
| else: | |
| st.session_state.arpeggiator.note_off(note) | |
| else: | |
| # Trigger directly | |
| if etype == 'noteOn': | |
| note_on(note, vel) | |
| else: | |
| note_off(note) | |
| # Also echo to output port | |
| if st.session_state.midi_out: | |
| if etype == 'noteOn': | |
| out_msg = Message('note_on', note=note, velocity=vel) | |
| st.session_state.midi_out.send(out_msg) | |
| else: | |
| out_msg = Message('note_off', note=note, velocity=0) | |
| st.session_state.midi_out.send(out_msg) | |
| debug_area.write(f"MIDI Note Event -> {event}") | |
| elif etype == 'drum': | |
| # for drum, we have event['padIndex'] | |
| idx = event['padIndex'] | |
| vel = event.get('velocity', 100) | |
| drum_trigger(idx, vel) | |
| debug_area.write(f"Drum Trigger -> Pad {idx}") | |
| else: | |
| pass | |
| # We'll do a short poll in the Streamlit loop | |
| # (In actual usage, you'd use a Streamlit custom component to pass these more elegantly.) | |
| def poll_events(): | |
| while not st.session_state.incoming_events.empty(): | |
| e = st.session_state.incoming_events.get_nowait() | |
| dispatch_event(e) | |
| poll_events() | |
| st.write("Try pressing the on-screen keys/pads or your hardware keyboard/pads. Enjoy!") | |
| # Cleanup | |
| def cleanup(): | |
| st.session_state.arpeggiator.stop() | |
| for note in list(active_oscillators.keys()): | |
| note_off(note) | |
| if st.session_state.midi_in: | |
| st.session_state.midi_in.close() | |
| if st.session_state.midi_out: | |
| st.session_state.midi_out.close() | |
| s.stop() | |
| st.on_session_end(cleanup) | |
| if __name__ == "__main__": | |
| main() | |