from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import numpy as np
import copy,os
import random
import json
from . import GSPattern,GSPatternEvent
from . import GSBassmineUtils
[docs]def normalize(a):
"""
Normalize matrix by rows
Args:
a: matrix (2D)
Returns:
a: normalized matrix
"""
c = 0
for row in a:
factor = sum(row)
if factor > 0:
a[c, :] = row / factor
c += 1
return a
[docs]def indices(a, func):
"""
Return indices in an array matching to a given function
Examples: indices(array, lambda x: x <= 1)
Args:
a: array, list
func: lambda function
Returns:
Indices of 'a' matching lambda function
"""
return [i for (i, val) in enumerate(a) if func(val)]
[docs]def pdf_sampling(n): # variable argument list
x = random.random()
f = 0
for i in range(1, len(n)):
if x < sum(n[0:i]):
f = i - 1
break
return f
[docs]class MarkovModel:
"""
Class to operate with markov models. Internal function used by GSBassmineAnalysis module.
Attributes:
model_size: Dictionary size of the model
normalized: Boolean to control if the model has been normalized
support_temporal: internal matrix to compute markov model
support_initial: internal matrix to compute markov model
support_interlocking: internal matrix to compute markov model
initial_model: matrix to store initial MTM
temporal_model: matrix to store temporal MTM
interlocking_model: matrix to store interlocking MTM
"""
def __init__(self, model_size, order=1):
"""
Constructor
Args:
model_size: Dictionary size of the model
order: default = 1 (not implemented for other orders yet...)
Returns:
instance of MarkovModel
"""
if type(model_size) == tuple:
if len(model_size) == 2:
self.model_size = model_size
else:
raise NameError("Model size must be an integer or a tuple of the form (x,y)")
elif type(model_size) == int:
self.model_size = (model_size, model_size)
else:
raise NameError("Model size must be an integer or a tuple of the form (x,y)")
# Model state
self.normalized = False
# Temporal model
self.support_temporal = np.zeros(self.model_size, dtype=float)
self.support_initial = np.zeros(self.model_size[0], dtype=float)
# Interlocking model
self.support_interlocking = np.zeros(self.model_size, dtype=float)
# Normalized model
self.initial_model = []
self.temporal_model = []
self.interlocking_model = []
# Pattern dictionary. Used for generation (table index -> onset pattern)
self.model_dictionary = createMarkovGenerationDictionary()
[docs] def add_temporal(self, pattern):
"""
Create markov transition matrix given a sequence
First element counts are stored in self.initial_model
Rest of elements feed the transition matrix (no time-series)
Args:
pattern: list or array with idx of the state dictionary
"""
count = 0
for p in pattern:
if count == 0:
self.update_initial(p)
elif 0 < count < len(pattern) - 1:
self.update_temporal(pattern[count - 1], p)
elif count == len(pattern) - 1:
self.update_temporal(pattern[count], pattern[0])
count += 1
[docs] def add_interlocking(self, patt_kick, patt_bass):
"""
Create interlocking (concurrency) markov matrix given two sequences
Args:
patt_kick: kick drum pattern (anchor)
patt_bass: bassline pattern
"""
l = min(len(patt_kick), len(patt_bass))
for i in range(l):
self.update_interlocking(patt_kick[i], patt_bass[i])
[docs] def update_interlocking(self, x, y):
"""
internal function to update interlocking matrix
Args:
x: row (anchor)
y: col (bass)
"""
self.support_interlocking[int(x), int(y)] += 1.
[docs] def update_temporal(self, x, y):
"""
internal function to update temporal matrix
Args:
x: row (past)
y: col (present)
"""
self.support_temporal[int(x), int(y)] += 1.
[docs] def update_initial(self, x):
"""
internal function to update initial probabilites
Args:
x: row (probabilities)
"""
self.support_initial[int(x)] += 1.
[docs] def normalize_model(self):
"""
Normalize matrices
"""
self.initial_model = self.support_initial / sum(self.support_initial)
self.temporal_model = normalize(self.support_temporal)
self.interlocking_model = normalize(self.support_interlocking)
self.normalized = True
[docs] def get_initial(self):
"""
Get initial probabilites
Returns:
initial_model: list
"""
return self.initial_model
[docs] def get_temporal(self):
"""
Get temporal model
Returns:
temporal_model: 2D matrix
"""
return self.temporal_model
[docs] def get_interlocking(self):
"""
Get interlocking model
Returns:
interlocking_model: 2D matrix
"""
return self.interlocking_model
[docs] def rhythm_model(self, _path="output/"):
path = _path
init_set = markov_tm_2dict(self.initial_model)
#print( self.initial_model)
init = []
init_dict = {}
init_dict['initial'] = {}
init_dict['initial']['pattern'] = [int(r) for r in init_set]
init_dict['initial']['prob'] = [self.initial_model[i] for i in init_dict['initial']['pattern']]
#print( "init\n", init_dict)
rhythm_temporal_model = markov_tm_2dict(self.get_temporal())
rhythm_dict = dict()
#print((pitch_temporal_model))
## Temporal model
for key,val in rhythm_temporal_model.items():
rhythm_dict[key] = {}
#print( key # parent)
#print( list(val) # child)
tmp = [] # child
for v in val:
tmp.append(self.get_temporal()[key, int(v)])
#print( list(tmp/sum(tmp)))
rhythm_dict[key]['pattern'] = [int(r) for r in rhythm_temporal_model[key]]
rhythm_dict[key]['probs'] = list(tmp/sum(tmp))
ePath = os.path.abspath(os.path.join(path,'HModel.json'))
with open( ePath , 'w') as outfile:
json.dump(rhythm_dict, outfile)
outfile.close()
with open( path + 'HModel_init.json', 'w') as outfile:
json.dump(init_dict, outfile)
outfile.close()
#print( "Rhythm model computed\n", rhythm_dict)
return [init_dict, rhythm_dict]
[docs] def pitch_model(self):
"""
Build pitch model
[work in progress...]
Takes first note an assume it as root note. Other notes are represented as intervals relative to root (first note)
Returns:
dictionary{0: {'interval': , 'probs'}, {},{},...}
"""
pitch_temporal_model = markov_tm_2dict(self.get_temporal())
pitch_dict = dict()
#print((pitch_temporal_model))
## Temporal model
for key,val in pitch_temporal_model.items():
pitch_dict[key-12] = {}
#print( key # parent)
#print( list(val) # child)
tmp = [] # child
for v in val:
tmp.append(self.get_temporal()[key, int(v)])
#print( list(tmp/sum(tmp)))
pitch_dict[key-12]['interval'] = [int(x)-12 for x in val]
pitch_dict[key-12]['probs'] = list(tmp/sum(tmp))
print( "Pitch model computed\n", pitch_dict)
return pitch_dict
[docs]def constrainMM(markov_model, target, _path="output/"):
"""
Compute non-homogeneuous markov model (NHMM) based on interlocking constraint.
Given a target pattern it constraint the original model and ensure arc-consistency
This function is also implemented as a pyext class.
Args:
markov_model: MarkovModel instance (output from GSBassmineUtils.corpus_analysis())
target: Target pattern for interlocking (kick) represented by its pattern ids.
_path: Path to where the Markov models will be stored
Returns:
Interlocking model as dictionary in JSON format
"""
path = _path
b0 = copy.copy(markov_model.get_initial())
b = copy.copy(markov_model.get_temporal())
inter = copy.copy(markov_model.get_interlocking())
# b and inter are converted to dictionaries {row>0 : (idx columns>0)}
# Domains
Dom_init = markov_tm_2dict(b0)
Dom_B = markov_tm_2dict(b)
Dom_I = markov_tm_2dict(inter)
#print( "Initial dict ", Dom_init)
#print( "Temporal dict ", Dom_B)
#print( "Interlocking dict ", Dom_I)
## Representation of target kick pattern as variable domain
target = GSBassmineUtils.translate_rhythm(GSBassmineUtils.binaryBeatPattern([e.startTime for e in target.events],target.duration))
target_setlist = []
for t in target:
# RELAXATION RULE: if the target kick is not in the model consider metronome pulse as kick
if t in Dom_I:
target_setlist.append(Dom_I[t])
else:
target_setlist.append(Dom_I[8])
print( "Kick pattern mistmatch")
#print( target_setlist)
## V store the domain of patterns at each step
V = []
filter_init = Dom_init.intersection(target_setlist[0])
#print( list(filter_init))
## Look for possible continuations of filter_init in Dom_B, constrained to target_list[1]
V.append(dict())
tmp = []
for f in filter_init:
# print( "Possible intital continuations",f, Dom_B[int(f)])
# print( "Kick constrain", target_setlist[1])
# print( "Intersection", Dom_B[int(f)].intersection(target_setlist[1]))
if len(Dom_B[int(f)].intersection(target_setlist[1])) > 0:
V[0][int(f)] = Dom_B[int(f)].intersection(target_setlist[1])
tmp.append(f)
# print( "\n\n")
#print( "Kick constr", list(target_setlist[0]))
#print( "V0", V[0])
#print( "V1", V[1].keys() # Domain for step 1 / rows of transition matrix)
#print( V[1])
## Create rest of V
##############################################
## Make backtrack free Non-Homogeneuous Markov Model ordr 1
##############################################
for step in range(1, len(target)-1):
V.append(dict())
#print( "Kick constr", list(target_setlist[step]))
# for each v in V[step] keep continuations that match interlocking with step+1
for t in target_setlist[step]:
if len(Dom_B[int(t)].intersection(target_setlist[step+1])):
V[step][int(t)] = Dom_B[int(t)].intersection(target_setlist[step+1])
#print( "check\n")
#print( "V", step, V[step].keys())
## Delete values from each key in V[i] that are not in V[i+1]
val_del = dict()
## Font-propagation
for step in range(1,len(target)-1):
val_del_temp = []
next_key = set([str(x) for x in V[step].keys()])
#print( next_key)
for key, value in V[step-1].items():
#print( key, value)
tmp_int = value.intersection(next_key)
#if len(tmp_int) > 0:
V[step-1][key] = tmp_int
if len(tmp_int) == 0:
val_del_temp.append(key)
val_del[step] = val_del_temp
#print( val_del)
## Back-propagation
for step, value in val_del.items():
if len(value) > 0:
for v in value:
## Delete key
V[step-1].pop(v, None)
## Delete in previous continuations
#print( V[step-2])
for idx in V[step-2].keys():
V[step-2][idx] = set([str(x) for x in V[step-1].keys()]).intersection(V[step-2][idx])
# BUILD FINAL DICTIONARY
#print( "\nFinal Model:")
out_Model = {}
init = []
init_dict = dict()
for key in V[0]:
init.append(b0[key])
init_dict['initial'] = dict()
init_dict['initial']['prob'] = list(init/sum(init))
init_dict['initial']['pattern'] = V[0].keys()
#print( init_dict)
for i in range(len(V)):
out_Model[i] = {}
#print( "step:",i)
for key,val in V[i].items():
out_Model[i][key] = {}
#print( key # parent)
#print( list(val) # child)
tmp = [] # child
for v in val:
tmp.append(b[key, int(v)])
#print( list(tmp/sum(tmp)))
out_Model[i][key]['pattern'] = [int(x) for x in val]
out_Model[i][key]['probs'] = list(tmp/sum(tmp))
#print( out_Model)
with open( path + 'NHModel.json', 'w') as outfile:
json.dump(out_Model, outfile)
outfile.close()
with open( path + 'Model_init.json', 'w') as outfile:
json.dump(init_dict, outfile)
outfile.close()
#print(("Interlocking model build!"))
return [init_dict, out_Model]
[docs]def variationMM(markov_model, target, _path="output/"):
"""
Compute non-homogeneuous markov model (NHMM) based on variation constraint.
Given a target Variation Mask(VM) it constraint the original model and ensure arc-consistency
Examples:
VM should be a list representing a pattern (id formatted) with negative numbers on those frames to variate.
Positive values will be preserved.
Args:
markov_model: MarkovModel instance (output from GSBassmineUtils.corpus_analysis())
target: Variation Mask (list with negative numbers indicating variation of that time frame)
_path: Path to where the Markov models will be stored
Returns:
Variation model as a dictionary in JSON format
"""
path = _path
b = copy.copy(markov_model.get_temporal())
## Domains
#Dom_init = markov_tm_2dict(b0)
Dom_B = markov_tm_2dict(b)
#Dom_I = markov_tm_2dict(inter)
#print( "Initial dict ", Dom_init)
#print( "Temporal dict ", Dom_B)
#print( "Interlocking dict ", Dom_I)
# target
#target = [8,8,4,-2,2,0,4,-2,8]
# Create V : variable domain for transitions
V = []
# target
#target = [8,8,4,14,4,10,8,2,8]
V.append(dict())
if target[1] >= 0:
V[0][target[0]] = {str(target[1])}
else:
V[0][target[0]] = Dom_B[target[0]]
for i in range(1,len(target)-1):
V.append(dict())
if target[i] >= 0: # Current beat don't vary
for key, value in V[i-1].items():
# store keys that match target[i]
tmp_key = value.intersection({str(target[i])})
if len(tmp_key) > 0:
V[i][int(list(tmp_key)[0])] = Dom_B[int(list(tmp_key)[0])]
#V[i][target[i-1]] = set([str(target[i])])
else: # Current beat varies
#Check possible continuations from V[i-1] as Key candidates in V[i]
for key, value in V[i-1].items():
for v in value:
V[i][int(v)] = Dom_B[int(v)]
#print( "h")
#V.append(dict())
#V[len(target)-1][target[len(target)-1]] = set([str(target[len(target)-1])])
#print( V)
## Delete values from each key in V[i] that are not in V[i+1]
val_del = dict()
## Font-propagation
for step in range(1,len(target)-1):
val_del_temp = []
next_key = set([str(x) for x in V[step].keys()])
#print( next_key)
for key, value in V[step-1].items():
#print( key, value)
tmp_int = value.intersection(next_key)
#if len(tmp_int) > 0:
V[step-1][key] = tmp_int
if len(tmp_int) == 0:
val_del_temp.append(key)
val_del[step] = val_del_temp
#print( val_del)
## Back-propagation
for step, value in val_del.items():
if len(value) > 0:
for v in value:
## Delete key
V[step-1].pop(v, None)
## Delete in previous continuations
#print( V[step-2])
for idx in V[step-2].keys():
V[step-2][idx] = set([str(x) for x in V[step-1].keys()]).intersection(V[step-2][idx])
# BUILD FINAL DICTIONARY
#print( "\nFinal Model:")
for key,val in V[len(V)-1].items():
tmp_val = val.intersection({str(target[len(target) - 1])})
#print( tmp_val)
if len(tmp_val)>0:
V[len(V)-1][key] = tmp_val
#for v in V:
# print( v)
# print( "\n")
out_Model = {}
# Force last beat
#init = []
init_dict = dict()
#for key in V[0]:
# init.append(b0[key])
init_dict['initial'] = dict()
init_dict['initial']['prob'] = 1.00
init_dict['initial']['pattern'] = target[0]
#print( init_dict)
for i in range(len(V)):
out_Model[i] = {}
#print( "step:",i)
for key,val in V[i].items():
out_Model[i][key] = {}
#print( key # parent)
#print( list(val) # child)
tmp = [] # child
for v in val:
tmp.append(b[key, int(v)])
#print( list(tmp/sum(tmp)))
#print( tmp)
out_Model[i][key]['pattern'] = [int(x) for x in val]
out_Model[i][key]['probs'] = list(tmp/sum(tmp))
#print( out_Model)
with open( path + 'NHModel_var.json', 'w') as outfile:
json.dump(out_Model, outfile)
outfile.close()
with open( path + 'Model_init_var.json', 'w') as outfile:
json.dump(init_dict, outfile)
outfile.close()
print(("Variation model build!"))
return [init_dict, out_Model]
[docs]def markov_tm_2dict(a):
"""
Convert markov transition matrix to dictionary of sets.
Compact representation to avoid sparse matrices and better performance in the constrain model
Args:
a: MarkovModel temporal/interlocking/pitch matrix
Returns:
dictionary{}
"""
out = dict()
key = 0
if len(a.shape) == 2:
for row in a:
value = 0
tmp_dom = []
if sum(row) > 0:
for col in row:
if col > 0:
tmp_dom.append(str(value))
value += 1
out[key] = set(tmp_dom)
key += 1
return out
elif len(a.shape) == 1:
tmp_dom = []
value = 0
for col in a:
if col > 0:
tmp_dom.append(str(value))
value += 1
return set(tmp_dom)
else:
print( "Wrong size")
[docs]def createMarkovGenerationDictionary(toJSON=False, _path="output/"):
"""
Internal function to build a dictionary relating binarized patterns into start times (in beats)
Args:
toJSON: boolean. If true it exports to JSON
_path: path to store the dictionary as JSON
Returns: Pattern dictionary
"""
start_times = [0,0.25,0.5,0.75]
out = dict()
out['patterns'] = dict()
for p in range(16):
# Binary pattern
binpatt = format(p, '04b')
st = []
for i in range(len(start_times)):
if binpatt[i] == '1':
st.append((start_times[i]))
val = st
out['patterns'][p] = val
#print( out)
path = _path
name = 'pattern_dict'
data = out
if toJSON:
with open(path + name + '.json', 'w') as outfile:
json.dump(data, outfile)
return data
[docs]def generateBassRhythm(markov_model, beat_length=8, target=[]):
"""
Function to generate a rhythmic bassline. If no target given the system assume no constraints and uses the regular
Markov model (computed by MarkovModel.rhythm_model()).
If target given it is assumed as target constraint for interlocking model.
Args:
markov_model: output from MarkovModel.rhythm_model()
beat_length: desired length of the generated pattern
target: Pattern used as constraint in Interlocking Model.
Returns:
bassline: GSPattern containing bassline onset pattern
"""
bassline = GSPattern()
pattern_idx = []
if len(target) == 0: # no constraints
HMM = markov_model.rhythm_model()
pattern_idx.append(np.random.choice(HMM[0]['initial']['pattern'], p=HMM[0]['initial']['prob']))
for beat in range(beat_length-1):
pattern_idx.append(np.random.choice(HMM[1][pattern_idx[beat]]['pattern'],
p=HMM[1][pattern_idx[beat]]['probs']))
else: # use constrained model
if len(target) < beat_length:
beat_length = len(target)
NHMM = constrainMM(markov_model, target)
pattern_idx.append(np.random.choice(NHMM[0]['initial']['pattern'], p=NHMM[0]['initial']['prob']))
for beat in range(beat_length-1):
pattern_idx.append(np.random.choice(NHMM[1][beat][pattern_idx[beat]]['pattern'],
p=NHMM[1][beat][pattern_idx[beat]]['probs']))
bassline.duration = len(pattern_idx)
start_times = []
beat_count = 0
for idx in pattern_idx:
st = markov_model.model_dictionary['patterns'][idx]
if len(st)>0:
for s in st:
bassline.events.append(GSPatternEvent(s + beat_count,0.25,36,velocity=110,tag="bass"))
beat_count += 1
return bassline
def _generateBassRhythm(markov_model, beat_length=8, target=[]):
"""
Function to generate a rhythmic bassline. If no target given the system assume no constraints and uses the regular
Markov model (computed by MarkovModel.rhythm_model()).
If target given it is assumed as target constraint for interlocking model.
Args:
markov_model: output from MarkovModel.rhythm_model()
beat_length: desired length of the generated pattern
target: Pattern used as constraint in Interlocking Model.
Returns:
bassline: GSPattern containing bassline onset pattern
"""
bassline = GSPattern()
pattern_idx = []
markovDict = createMarkovGenerationDictionary()
pattern_idx.append(np.random.choice(markov_model[0]['initial']['pattern'], p=markov_model[0]['initial']['prob']))
for beat in range(beat_length-1):
pattern_idx.append(np.random.choice(markov_model[1][beat][pattern_idx[beat]]['pattern'],
p=markov_model[1][beat][pattern_idx[beat]]['probs']))
bassline.duration = len(pattern_idx)
start_times = []
beat_count = 0
for idx in pattern_idx:
st = markovDict['patterns'][idx]
if len(st)>0:
for s in st:
bassline.events.append(GSPatternEvent(s + beat_count,0.25,36,velocity=110,tag="bass"))
beat_count += 1
return bassline
[docs]def generateBassRhythmVariation(markov_model, target_pattern, variation_mask):
"""
Function that implements a variation model given an already generated pattern. Based on the variation mask it creates
a Markov model that preserve desired beat measures while it models "variation" beats to be stylistically consistent.
Args:
markov_model: output from MarkovModel.rhythm_model()
target_pattern: GSPattern, for instance output from generateBassRhythm()
variation_mask: List of the same length of the target_pattern(in beats). Positions with value 1 will be preserved,
those with -1 will vary.
Returns:
bassline: generated GSPattern
"""
bassline = GSPattern()
pattern_idx = []
if len(variation_mask) < target_pattern.duration:
print( "Variation mask must be same length as target_pattern (in beats)")
return
else:
#Convert target pattern to markov dictionary
onset_target = [x.startTime for x in target_pattern.events]
target_rhythm = GSBassmineUtils.binaryBeatPattern(onset_target, target_pattern.duration)
target_id = GSBassmineUtils.translate_rhythm(target_rhythm)
#Mask formatted pattern
masked_target = [target_id[i] * variation_mask[i] for i in range(len(variation_mask) - 1)]
#Build variation model
NHMM = variationMM(markov_model, masked_target)
#Generate GSPattern
pattern_idx.append(target_id[0])
for beat in range(len(masked_target) - 1):
pattern_idx.append(np.random.choice(NHMM[1][beat][pattern_idx[beat]]['pattern'],
p=NHMM[1][beat][pattern_idx[beat]]['probs']))
bassline.duration = len(pattern_idx)
beat_count = 0
for idx in pattern_idx:
st = markov_model.model_dictionary['patterns'][idx]
if len(st)>0:
for s in st:
bassline.events.append(GSPatternEvent(s + beat_count,0.25,36,velocity=110,tag=("bass")))
beat_count += 1
return bassline