Changes to add the read only cores and some fixes to bugs
for processing the config file.
Change-Id: I1393c05d4cbce215e97d1f17b13e044eda7ae472
diff --git a/arouterd/arouterd.go b/arouterd/arouterd.go
index bd2787a..0bcff08 100644
--- a/arouterd/arouterd.go
+++ b/arouterd/arouterd.go
@@ -50,17 +50,18 @@
Port uint64 `json:"Port"`
}
-type rwPod struct {
+type volthaPod struct {
name string
ipAddr string
node string
devIds map[string]struct{}
+ cluster string
backend string
connection string
}
type podTrack struct {
- pod *rwPod
+ pod *volthaPod
dn bool
}
@@ -124,8 +125,8 @@
return nil,errors.New("Timeout attempting to conect")
}
-func getRwPods(cs *kubernetes.Clientset, coreFilter * regexp.Regexp) []*rwPod {
- var rtrn []*rwPod
+func getVolthaPods(cs *kubernetes.Clientset, coreFilter * regexp.Regexp) []*volthaPod {
+ var rtrn []*volthaPod
pods, err := cs.CoreV1().Pods("").List(metav1.ListOptions{})
if err != nil {
@@ -140,7 +141,7 @@
// Only add the pod if it has an IP address. If it doesn't then it likely crashed and
// and is still in the process of getting re-started.
if v.Status.PodIP != "" {
- rtrn = append(rtrn, &rwPod{name:v.Name,ipAddr:v.Status.PodIP,node:v.Spec.NodeName,
+ rtrn = append(rtrn, &volthaPod{name:v.Name,ipAddr:v.Status.PodIP,node:v.Spec.NodeName,
devIds:make(map[string]struct{}), backend:"", connection:""})
}
}
@@ -148,7 +149,7 @@
return rtrn
}
-func reconcilePodDeviceIds(pod * rwPod, ids map[string]struct{}) bool {
+func reconcilePodDeviceIds(pod * volthaPod, ids map[string]struct{}) bool {
var idList cmn.IDs
for k,_ := range ids {
idList.Items = append(idList.Items, &cmn.ID{Id:k})
@@ -169,7 +170,7 @@
return true
}
-func queryPodDeviceIds(pod * rwPod) map[string]struct{} {
+func queryPodDeviceIds(pod * volthaPod) map[string]struct{} {
var rtrn map[string]struct{} = make(map[string]struct{})
// Open a connection to the pod
// port 50057
@@ -192,7 +193,7 @@
return rtrn
}
-func queryDeviceIds(pods []*rwPod) {
+func queryDeviceIds(pods []*volthaPod) {
for pk,_ := range pods {
// Keep the old Id list if a new list is not returned
if idList := queryPodDeviceIds(pods[pk]); len(idList) != 0 {
@@ -201,7 +202,7 @@
}
}
-func allEmpty(pods []*rwPod) bool {
+func allEmpty(pods []*volthaPod) bool {
for k,_ := range pods {
if len(pods[k].devIds) != 0 {
return false
@@ -210,13 +211,13 @@
return true
}
-func rmPod(pods []*rwPod, idx int) []*rwPod {
+func rmPod(pods []*volthaPod, idx int) []*volthaPod {
return append(pods[:idx],pods[idx+1:]...)
}
-func groupIntersectingPods1(pods []*rwPod, podCt int) ([][]*rwPod,[]*rwPod) {
- var rtrn [][]*rwPod
- var out []*rwPod
+func groupIntersectingPods1(pods []*volthaPod, podCt int) ([][]*volthaPod,[]*volthaPod) {
+ var rtrn [][]*volthaPod
+ var out []*volthaPod
for {
if len(pods) == 0 {
@@ -229,7 +230,7 @@
continue
}
// Start a pod group with this pod
- var grp []*rwPod
+ var grp []*volthaPod
grp = append(grp, pods[0])
pods = rmPod(pods,0)
//log.Debugf("Creating new group %s", pd[k].pod.name)
@@ -277,7 +278,7 @@
}
-func sameNode(pod *rwPod, grps [][]*rwPod) bool {
+func sameNode(pod *volthaPod, grps [][]*volthaPod) bool {
for _,v := range grps {
if v[0].node == pod.node {
return true
@@ -289,14 +290,14 @@
return false
}
-func startRemainingGroups1(grps [][]*rwPod, pods []*rwPod, podCt int) ([][]*rwPod, []*rwPod) {
- var grp []*rwPod
+func startRemainingGroups1(grps [][]*volthaPod, pods []*volthaPod, podCt int) ([][]*volthaPod, []*volthaPod) {
+ var grp []*volthaPod
for k,_ := range pods {
if sameNode(pods[k], grps) {
continue
}
- grp = []*rwPod{}
+ grp = []*volthaPod{}
grp = append(grp, pods[k])
pods = rmPod(pods, k)
grps = append(grps, grp)
@@ -307,7 +308,7 @@
return grps, pods
}
-func hasSingleSecondNode(grp []*rwPod) bool {
+func hasSingleSecondNode(grp []*volthaPod) bool {
var srvrs map[string]struct{} = make(map[string]struct{})
for k,_ := range grp {
if k == 0 {
@@ -321,7 +322,7 @@
return false
}
-func addNode(grps [][]*rwPod, idx *rwPod, item *rwPod) [][]*rwPod {
+func addNode(grps [][]*volthaPod, idx *volthaPod, item *volthaPod) [][]*volthaPod {
for k,_ := range grps {
if grps[k][0].name == idx.name {
grps[k] = append(grps[k], item)
@@ -332,7 +333,7 @@
return grps
}
-func removeNode(grps [][]*rwPod, item *rwPod) [][]*rwPod {
+func removeNode(grps [][]*volthaPod, item *volthaPod) [][]*volthaPod {
for k,_ := range grps {
for k1,_ := range grps[k] {
if grps[k][k1].name == item.name {
@@ -344,8 +345,8 @@
return grps
}
-func groupRemainingPods1(grps [][]*rwPod, pods []*rwPod) [][]*rwPod {
- var lgrps [][]*rwPod
+func groupRemainingPods1(grps [][]*volthaPod, pods []*volthaPod) [][]*volthaPod {
+ var lgrps [][]*volthaPod
// All groups must be started when this function is called.
// Copy incomplete groups
for k,_ := range grps {
@@ -402,8 +403,8 @@
return grps
}
-func groupPods1(pods []*rwPod) [][]*rwPod {
- var rtrn [][]*rwPod
+func groupPods1(pods []*volthaPod) [][]*volthaPod {
+ var rtrn [][]*volthaPod
var podCt int = len(pods)
rtrn,pods = groupIntersectingPods1(pods, podCt)
@@ -435,9 +436,9 @@
return false
}
-func setConnection(client pb.ConfigurationClient, backend string, connection string, addr string, port uint64) {
+func setConnection(client pb.ConfigurationClient, cluster string, backend string, connection string, addr string, port uint64) {
log.Debugf("Configuring backend %s : connection %s\n\n", backend, connection)
- cnf := &pb.Conn{Server:"grpc_command",Cluster:"vcore",Backend:backend,
+ cnf := &pb.Conn{Server:"grpc_command",Cluster:cluster, Backend:backend,
Connection:connection,Addr:addr,
Port:port}
if res, err := client.SetConnection(context.Background(), cnf); err != nil {
@@ -461,7 +462,7 @@
}
}
-func getBackendForCore(coreId string, coreGroups [][]*rwPod) string {
+func getBackendForCore(coreId string, coreGroups [][]*volthaPod) string {
for _,v := range coreGroups {
for _,v2 := range v {
if v2.name == coreId {
@@ -475,7 +476,7 @@
func monitorDiscovery(client pb.ConfigurationClient,
ch <-chan *ic.InterContainerMessage,
- coreGroups [][]*rwPod) {
+ coreGroups [][]*volthaPod) {
var id map[string]struct{} = make(map[string]struct{})
select {
@@ -498,7 +499,7 @@
}
func startDiscoveryMonitor(client pb.ConfigurationClient,
- coreGroups [][]*rwPod) error {
+ coreGroups [][]*volthaPod) error {
var ch <-chan *ic.InterContainerMessage
// Connect to kafka for discovery events
topic := &kafka.Topic{Name: "AffinityRouter"}
@@ -517,16 +518,16 @@
// have changed based on the list provided
// and returns a coreGroup with only the changed
// items and a pod list with the new items
-func getAddrDiffs(coreGroups [][]*rwPod, rwPods []*rwPod) ([][]*rwPod, []*rwPod) {
- var nList []*rwPod
- var rtrn [][]*rwPod = make([][]*rwPod, nPods>>1)
+func getAddrDiffs(coreGroups [][]*volthaPod, rwPods []*volthaPod) ([][]*volthaPod, []*volthaPod) {
+ var nList []*volthaPod
+ var rtrn [][]*volthaPod = make([][]*volthaPod, nPods>>1)
var ipAddrs map[string]struct{} = make(map[string]struct{})
log.Debug("Get addr diffs")
// Start with an empty array
for k,_ := range rtrn {
- rtrn[k] = make([]*rwPod, 2)
+ rtrn[k] = make([]*volthaPod, 2)
}
// Build a list with only the new items
@@ -553,8 +554,8 @@
// pods being replaced. The criteria is that
// the new pod be on the same server as the
// old pod was.
-func reconcileAddrDiffs(coreGroupDiffs [][]*rwPod, rwPodDiffs []*rwPod) ([][]*rwPod) {
- var srvrs map[string][]*rwPod = make(map[string][]*rwPod)
+func reconcileAddrDiffs(coreGroupDiffs [][]*volthaPod, rwPodDiffs []*volthaPod) ([][]*volthaPod) {
+ var srvrs map[string][]*volthaPod = make(map[string][]*volthaPod)
log.Debug("Reconciling diffs")
log.Debug("Building server list")
@@ -587,41 +588,82 @@
return coreGroupDiffs
}
-func applyAddrDiffs(client pb.ConfigurationClient, coreGroups [][]*rwPod, rwPods []*rwPod) {
- var newEntries [][]*rwPod
+func applyAddrDiffs(client pb.ConfigurationClient, coreList interface{}, nPods []*volthaPod) {
+ var newEntries [][]*volthaPod
log.Debug("Applying diffs")
- newEntries = reconcileAddrDiffs(getAddrDiffs(coreGroups, rwPods))
+ switch cores := coreList.(type) {
+ case [][]*volthaPod:
+ newEntries = reconcileAddrDiffs(getAddrDiffs(cores, nPods))
- // Now replace the information in coreGropus with the new
- // entries and then reconcile the device ids on the core
- // that's in the new entry with the device ids of it's
- // active-active peer.
- for k1,v1 := range coreGroups {
- for k2,v2 := range v1 {
- if newEntries[k1][k2] != nil {
- // TODO: Missing is the case where bothe the primary
- // and the secondary core crash and come back.
- // Pull the device ids from the active-active peer
- ids := queryPodDeviceIds(coreGroups[k1][k2^1])
- if len(ids) != 0 {
- if reconcilePodDeviceIds(newEntries[k1][k2], ids) == false {
- log.Errorf("Attempt to reconcile ids on pod %v failed",newEntries[k1][k2])
+ // Now replace the information in coreGropus with the new
+ // entries and then reconcile the device ids on the core
+ // that's in the new entry with the device ids of it's
+ // active-active peer.
+ for k1,v1 := range cores {
+ for k2,v2 := range v1 {
+ if newEntries[k1][k2] != nil {
+ // TODO: Missing is the case where bothe the primary
+ // and the secondary core crash and come back.
+ // Pull the device ids from the active-active peer
+ ids := queryPodDeviceIds(cores[k1][k2^1])
+ if len(ids) != 0 {
+ if reconcilePodDeviceIds(newEntries[k1][k2], ids) == false {
+ log.Errorf("Attempt to reconcile ids on pod %v failed",newEntries[k1][k2])
+ }
}
+ // Send the affininty router new connection information
+ setConnection(client, "vcore", v2.backend, v2.connection, newEntries[k1][k2].ipAddr, 50057)
+ // Copy the new entry information over
+ cores[k1][k2].ipAddr = newEntries[k1][k2].ipAddr
+ cores[k1][k2].name = newEntries[k1][k2].name
+ cores[k1][k2].devIds = ids
}
- // Send the affininty router new connection information
- setConnection(client, v2.backend, v2.connection, newEntries[k1][k2].ipAddr, 50057)
- // Copy the new entry information over
- coreGroups[k1][k2].ipAddr = newEntries[k1][k2].ipAddr
- coreGroups[k1][k2].name = newEntries[k1][k2].name
- coreGroups[k1][k2].devIds = ids
}
}
+ case []*volthaPod:
+ var mia []*volthaPod
+ var found bool
+ // TODO: Break this using functions to simplify
+ // reading of the code.
+ // Find the core(s) that have changed addresses
+ for k1,v1 := range cores {
+ found = false
+ for _, v2 := range nPods {
+ if v1.ipAddr == v2.ipAddr {
+ found = true
+ break
+ }
+ }
+ if found == false {
+ mia = append(mia, cores[k1])
+ }
+ }
+ // Now plug in the new addresses and set the connection
+ for _,v1 := range nPods {
+ found = false
+ for _,v2 := range cores {
+ if v1.ipAddr == v2.ipAddr {
+ found = true
+ break
+ }
+ }
+ if found == true {
+ continue
+ }
+ mia[0].ipAddr = v1.ipAddr
+ mia[0].name = v1.name
+ setConnection(client, "ro_vcore", mia[0].backend, mia[0].connection, v1.ipAddr, 50057)
+ // Now get rid of the mia entry just processed
+ mia = append(mia[:0],mia[1:]...)
+ }
+ default:
+ log.Error("Internal: Unexpected type in call to applyAddrDiffs");
}
}
-func updateDeviceIds(coreGroups [][]*rwPod, rwPods []*rwPod) {
- var byName map[string]*rwPod = make(map[string]*rwPod)
+func updateDeviceIds(coreGroups [][]*volthaPod, rwPods []*volthaPod) {
+ var byName map[string]*volthaPod = make(map[string]*volthaPod)
// Convinience
for _,v := range rwPods {
@@ -637,8 +679,10 @@
func startCoreMonitor(client pb.ConfigurationClient,
clientset *kubernetes.Clientset,
- coreFltr *regexp.Regexp,
- coreGroups [][]*rwPod) error {
+ rwCoreFltr *regexp.Regexp,
+ roCoreFltr *regexp.Regexp,
+ coreGroups [][]*volthaPod,
+ oRoPods []*volthaPod) error {
// Now that initial allocation has been completed, monitor the pods
// for IP changes
// The main loop needs to do the following:
@@ -657,7 +701,7 @@
for {
time.Sleep(10 * time.Second) // Wait a while
// Get the rw core list from k8s
- rwPods := getRwPods(clientset, coreFltr)
+ rwPods := getVolthaPods(clientset, rwCoreFltr)
queryDeviceIds(rwPods)
updateDeviceIds(coreGroups, rwPods)
// If we didn't get 2n+1 pods then wait since
@@ -673,19 +717,43 @@
if hasIpAddr(coreGroups, v.ipAddr) == false {
log.Debug("Address has changed...")
applyAddrDiffs(client, coreGroups, rwPods)
-
+ break
}
}
+
+ roPods := getVolthaPods(clientset, roCoreFltr)
+
+ if len(roPods) != 3 {
+ continue
+ }
+ for _,v := range roPods {
+ if hasIpAddr(oRoPods, v.ipAddr) == false {
+ applyAddrDiffs(client, oRoPods, roPods)
+ break
+ }
+ }
+
}
}
-func hasIpAddr(coreGroups [][]*rwPod, ipAddr string) bool {
- for _,v1 := range coreGroups {
- for _,v2 := range v1 {
- if v2.ipAddr == ipAddr {
+func hasIpAddr(coreList interface{}, ipAddr string) bool {
+ switch cores := coreList.(type) {
+ case []*volthaPod:
+ for _,v := range cores {
+ if v.ipAddr == ipAddr {
return true
}
}
+ case [][]*volthaPod:
+ for _,v1 := range cores {
+ for _,v2 := range v1 {
+ if v2.ipAddr == ipAddr {
+ return true
+ }
+ }
+ }
+ default:
+ log.Error("Internal: Unexpected type in call to hasIpAddr")
}
return false
}
@@ -700,7 +768,8 @@
// Set up the regular expression to identify the voltha cores
- coreFltr := regexp.MustCompile(`rw-core[0-9]-`)
+ rwCoreFltr := regexp.MustCompile(`rw-core[0-9]-`)
+ roCoreFltr := regexp.MustCompile(`ro-core-`)
// Set up logging
if _, err := log.SetDefaultLogger(log.JSON, 0, nil); err != nil {
@@ -719,7 +788,7 @@
client := pb.NewConfigurationClient(conn)
// Get the voltha rw-core podes
- rwPods := getRwPods(clientset, coreFltr)
+ rwPods := getVolthaPods(clientset, rwCoreFltr)
// Fetch the devices held by each running core
queryDeviceIds(rwPods)
@@ -734,6 +803,7 @@
// Assign the groupings to the the backends and connections
for k,_ := range coreGroups {
for k1,_ := range coreGroups[k] {
+ coreGroups[k][k1].cluster = "vcore"
coreGroups[k][k1].backend = "vcore"+strconv.Itoa(k+1)
coreGroups[k][k1].connection = "vcore"+strconv.Itoa(k+1)+strconv.Itoa(k1+1)
}
@@ -755,15 +825,28 @@
log.Info("Setting connections")
// Configure the backeds based on the calculated core groups
for _,v := range coreGroups {
- setConnection(client, v[0].backend, v[0].connection, v[0].ipAddr, 50057)
- setConnection(client, v[1].backend, v[1].connection, v[1].ipAddr, 50057)
+ setConnection(client, "vcore", v[0].backend, v[0].connection, v[0].ipAddr, 50057)
+ setConnection(client, "vcore", v[1].backend, v[1].connection, v[1].ipAddr, 50057)
+ }
+
+ // Process the read only pods
+ roPods := getVolthaPods(clientset, roCoreFltr)
+ for k,v := range roPods {
+ log.Debugf("Processing ro_pod %v", v)
+ vN := "ro_vcore"+strconv.Itoa(k+1)
+ log.Debugf("Setting connection %s, %s, %s", vN, vN+"1", v.ipAddr)
+ roPods[k].cluster = "ro_core"
+ roPods[k].backend = vN
+ roPods[k].connection = vN+"1"
+ setConnection(client, "ro_vcore", v.backend, v.connection, v.ipAddr, 50057)
}
log.Info("Starting discovery monitoring")
startDiscoveryMonitor(client, coreGroups)
log.Info("Starting core monitoring")
- startCoreMonitor(client, clientset, coreFltr, coreGroups) // Never returns
+ startCoreMonitor(client, clientset, rwCoreFltr,
+ roCoreFltr, coreGroups, roPods) // Never returns
return
}