[VOL-3661] PPPoE IA application - initial commit
Change-Id: Idaf23f8736cba955fe8a3049b8fc9c85b3cd3ab9
Signed-off-by: Gustavo Silva <gsilva@furukawalatam.com>
diff --git a/app/src/main/java/org/opencord/pppoeagent/cli/PppoeAgentShowUsersCommand.java b/app/src/main/java/org/opencord/pppoeagent/cli/PppoeAgentShowUsersCommand.java
new file mode 100644
index 0000000..a67ffca
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/cli/PppoeAgentShowUsersCommand.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.pppoeagent.cli;
+
+import org.opencord.pppoeagent.PppoeAgentService;
+import org.opencord.pppoeagent.PppoeSessionInfo;
+import org.apache.karaf.shell.api.action.Argument;
+import org.apache.karaf.shell.api.action.Command;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.onlab.packet.MacAddress;
+import org.onlab.packet.PPPoED;
+import org.onosproject.cli.AbstractShellCommand;
+import org.onosproject.net.AnnotationKeys;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.Port;
+import org.onosproject.net.device.DeviceService;
+
+/**
+ * Shows the PPPoE sessions/users learned by the agent.
+ */
+@Service
+@Command(scope = "pppoe", name = "pppoe-users",
+ description = "Shows the PPPoE users")
+public class PppoeAgentShowUsersCommand extends AbstractShellCommand {
+
+ @Argument(index = 0, name = "mac", description = "MAC address related to PPPoE session.",
+ required = false, multiValued = false)
+ private String macStr = null;
+
+ @Override
+ protected void doExecute() {
+ MacAddress macAddress = null;
+ if (macStr != null && !macStr.isEmpty()) {
+ try {
+ macAddress = MacAddress.valueOf(macStr);
+ } catch (IllegalArgumentException e) {
+ log.error(e.getMessage());
+ return;
+ }
+ }
+
+ DeviceService deviceService = AbstractShellCommand.get(DeviceService.class);
+ PppoeAgentService pppoeAgentService = AbstractShellCommand.get(PppoeAgentService.class);
+
+ if (macAddress != null) {
+ PppoeSessionInfo singleInfo = pppoeAgentService.getSessionsMap().get(macAddress);
+ if (singleInfo != null) {
+ Port devicePort = deviceService.getPort(singleInfo.getClientCp());
+ printPppoeInfo(macAddress, singleInfo, devicePort);
+ } else {
+ print("No session information found for provided MAC address %s", macAddress.toString());
+ }
+ } else {
+ pppoeAgentService.getSessionsMap().forEach((mac, sessionInfo) -> {
+ final Port devicePortFinal = deviceService.getPort(sessionInfo.getClientCp());
+ printPppoeInfo(mac, sessionInfo, devicePortFinal);
+ });
+ }
+ }
+
+ private void printPppoeInfo(MacAddress macAddr, PppoeSessionInfo sessionInfo, Port devicePort) {
+ PPPoED.Type lastReceivedPkt = PPPoED.Type.getTypeByValue(sessionInfo.getPacketCode());
+ ConnectPoint cp = sessionInfo.getClientCp();
+ String subscriberId = devicePort != null ? devicePort.annotations().value(AnnotationKeys.PORT_NAME) :
+ "UNKNOWN";
+
+ print("MacAddress=%s,SessionId=%s,CurrentState=%s,LastReceivedPacket=%s,DeviceId=%s,PortNumber=%s," +
+ "SubscriberId=%s",
+ macAddr.toString(), String.valueOf(sessionInfo.getSessionId()),
+ sessionInfo.getCurrentState(), lastReceivedPkt.name(),
+ cp.deviceId().toString(), cp.port().toString(), subscriberId);
+ }
+}
diff --git a/app/src/main/java/org/opencord/pppoeagent/cli/PppoeAgentStatsCommand.java b/app/src/main/java/org/opencord/pppoeagent/cli/PppoeAgentStatsCommand.java
new file mode 100644
index 0000000..885bf36
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/cli/PppoeAgentStatsCommand.java
@@ -0,0 +1,137 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.pppoeagent.cli;
+
+import org.opencord.pppoeagent.PppoeAgentEvent;
+import org.apache.karaf.shell.api.action.Argument;
+import org.apache.karaf.shell.api.action.Command;
+import org.apache.karaf.shell.api.action.Option;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.onosproject.cli.AbstractShellCommand;
+import java.util.Collections;
+import java.util.Map;
+import org.opencord.pppoeagent.impl.PppoeAgentCountersIdentifier;
+import org.opencord.pppoeagent.impl.PppoeAgentCountersStore;
+import org.opencord.pppoeagent.impl.PppoeAgentCounterNames;
+
+/**
+ * Display/Reset the PPPoE Agent application statistics.
+ */
+@Service
+@Command(scope = "pppoe", name = "pppoeagent-stats",
+ description = "Display or Reset the PPPoE Agent application statistics.")
+public class PppoeAgentStatsCommand extends AbstractShellCommand {
+ private static final String CONFIRM_PHRASE = "please";
+
+ @Option(name = "-r", aliases = "--reset", description = "Resets a specific counter.\n",
+ required = false, multiValued = false)
+ PppoeAgentCounterNames reset = null;
+
+ @Option(name = "-R", aliases = "--reset-all", description = "Resets all counters.\n",
+ required = false, multiValued = false)
+ boolean resetAll = false;
+
+ @Option(name = "-s", aliases = "--subscriberId", description = "Subscriber Id.\n",
+ required = false, multiValued = false)
+ String subscriberId = null;
+
+ @Option(name = "-p", aliases = "--please", description = "Confirmation phrase.",
+ required = false, multiValued = false)
+ String please = null;
+
+ @Argument(index = 0, name = "counter",
+ description = "The counter to display. In case not specified, all counters will be displayed.",
+ required = false, multiValued = false)
+ PppoeAgentCounterNames counter = null;
+
+ @Override
+ protected void doExecute() {
+ PppoeAgentCountersStore pppoeCounters = AbstractShellCommand.get(PppoeAgentCountersStore.class);
+
+ if ((subscriberId == null) || (subscriberId.equals("global"))) {
+ // All subscriber Ids
+ subscriberId = PppoeAgentEvent.GLOBAL_COUNTER;
+ }
+
+ if (resetAll || reset != null) {
+ if (please == null || !please.equals(CONFIRM_PHRASE)) {
+ print("WARNING: Be aware that you are going to reset the counters. " +
+ "Enter confirmation phrase to continue.");
+ return;
+ }
+ if (resetAll) {
+ // Reset all counters.
+ pppoeCounters.resetCounters(subscriberId);
+ } else {
+ // Reset the specified counter.
+ pppoeCounters.setCounter(subscriberId, reset, (long) 0);
+ }
+ } else {
+ Map<PppoeAgentCountersIdentifier, Long> countersMap = pppoeCounters.getCounters().counters();
+ if (countersMap.size() > 0) {
+ if (counter == null) {
+ String jsonString = "";
+ if (outputJson()) {
+ jsonString = String.format("{\"%s\":{", pppoeCounters.NAME);
+ } else {
+ print("%s [%s] :", pppoeCounters.NAME, subscriberId);
+ }
+ PppoeAgentCounterNames[] counters = PppoeAgentCounterNames.values();
+ for (int i = 0; i < counters.length; i++) {
+ PppoeAgentCounterNames counterType = counters[i];
+ Long value = countersMap.get(new PppoeAgentCountersIdentifier(subscriberId, counterType));
+ if (value == null) {
+ value = 0L;
+ }
+ if (outputJson()) {
+ jsonString += String.format("\"%s\":%d", counterType, value);
+ if (i < counters.length - 1) {
+ jsonString += ",";
+ }
+ } else {
+ printCounter(counterType, value);
+ }
+ }
+ if (outputJson()) {
+ jsonString += "}}";
+ print("%s", jsonString);
+ }
+ } else {
+ // Show only the specified counter
+ Long value = countersMap.get(new PppoeAgentCountersIdentifier(subscriberId, counter));
+ if (value == null) {
+ value = 0L;
+ }
+ if (outputJson()) {
+ print("{\"%s\":%d}", counter, value);
+ } else {
+ printCounter(counter, value);
+ }
+ }
+ } else {
+ print("No PPPoE Agent Counters were created yet for counter class [%s]",
+ PppoeAgentEvent.GLOBAL_COUNTER);
+ }
+ }
+ }
+
+ void printCounter(PppoeAgentCounterNames c, long value) {
+ // print in non-JSON format
+ print(" %s %s %-4d", c,
+ String.join("", Collections.nCopies(50 - c.toString().length(), ".")), value);
+ }
+}
\ No newline at end of file
diff --git a/app/src/main/java/org/opencord/pppoeagent/cli/package-info.java b/app/src/main/java/org/opencord/pppoeagent/cli/package-info.java
new file mode 100644
index 0000000..515908f
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/cli/package-info.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * CLI commands for the PPPoE agent.
+ */
+package org.opencord.pppoeagent.cli;
diff --git a/app/src/main/java/org/opencord/pppoeagent/impl/OsgiPropertyConstants.java b/app/src/main/java/org/opencord/pppoeagent/impl/OsgiPropertyConstants.java
new file mode 100644
index 0000000..65c8310
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/impl/OsgiPropertyConstants.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.pppoeagent.impl;
+
+/**
+ * Constants for default values of configurable properties.
+ */
+public final class OsgiPropertyConstants {
+
+ private OsgiPropertyConstants() {
+ }
+
+ public static final String ENABLE_CIRCUIT_ID_VALIDATION = "enableCircuitIdValidation";
+ public static final boolean ENABLE_CIRCUIT_ID_VALIDATION_DEFAULT = true;
+
+ public static final String PPPOE_COUNTERS_TOPIC = "pppoeCountersTopic";
+ public static final String PPPOE_COUNTERS_TOPIC_DEFAULT = "onos.pppoe.stats.kpis";
+
+ public static final String PUBLISH_COUNTERS_RATE = "publishCountersRate";
+ public static final int PUBLISH_COUNTERS_RATE_DEFAULT = 10;
+
+ public static final String PPPOE_MAX_MTU = "pppoeMaxMtu";
+ public static final int PPPOE_MAX_MTU_DEFAULT = 1500;
+
+ public static final String PACKET_PROCESSOR_THREADS = "packetProcessorThreads";
+ public static final int PACKET_PROCESSOR_THREADS_DEFAULT = 10;
+
+ public static final String SYNC_COUNTERS_RATE = "syncCountersRate";
+ public static final int SYNC_COUNTERS_RATE_DEFAULT = 5;
+}
diff --git a/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgent.java b/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgent.java
new file mode 100644
index 0000000..67c71da
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgent.java
@@ -0,0 +1,929 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.pppoeagent.impl;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableSet;
+
+import com.google.common.collect.Sets;
+import org.opencord.pppoeagent.PppoeAgentEvent;
+import org.opencord.pppoeagent.PppoeAgentListener;
+import org.opencord.pppoeagent.PppoeAgentService;
+import org.opencord.pppoeagent.PppoeAgentStoreDelegate;
+import org.opencord.pppoeagent.PPPoEDVendorSpecificTag;
+import org.opencord.pppoeagent.PppoeSessionInfo;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Serializer;
+import com.google.common.collect.ImmutableMap;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMap;
+
+import org.onlab.util.KryoNamespace;
+import org.onlab.packet.Ethernet;
+import org.onlab.packet.MacAddress;
+import org.onlab.packet.PPPoED;
+import org.onlab.packet.PPPoEDTag;
+import org.onlab.util.Tools;
+import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.event.AbstractListenerManager;
+import org.onosproject.mastership.MastershipEvent;
+import org.onosproject.mastership.MastershipListener;
+import org.onosproject.mastership.MastershipService;
+import org.onosproject.net.AnnotationKeys;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.Device;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.Port;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.config.ConfigFactory;
+import org.onosproject.net.config.NetworkConfigEvent;
+import org.onosproject.net.config.NetworkConfigListener;
+import org.onosproject.net.config.NetworkConfigRegistry;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.device.DeviceListener;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.TrafficTreatment;
+import org.onosproject.net.packet.DefaultOutboundPacket;
+import org.onosproject.net.packet.OutboundPacket;
+import org.onosproject.net.packet.PacketContext;
+import org.onosproject.net.packet.PacketProcessor;
+import org.onosproject.net.packet.PacketService;
+import org.opencord.sadis.BaseInformationService;
+import org.opencord.sadis.SadisService;
+import org.opencord.sadis.SubscriberAndDeviceInformation;
+import org.opencord.sadis.UniTagInformation;
+import org.opencord.pppoeagent.util.CircuitIdBuilder;
+import org.opencord.pppoeagent.util.CircuitIdFieldName;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Modified;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Dictionary;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+import org.onosproject.store.service.Versioned;
+
+import static java.util.concurrent.Executors.newFixedThreadPool;
+
+import static org.onlab.packet.PPPoED.PPPOED_CODE_PADI;
+import static org.onlab.packet.PPPoED.PPPOED_CODE_PADO;
+import static org.onlab.packet.PPPoED.PPPOED_CODE_PADR;
+import static org.onlab.packet.PPPoED.PPPOED_CODE_PADS;
+import static org.onlab.packet.PPPoED.PPPOED_CODE_PADT;
+import static org.onlab.packet.PPPoEDTag.PPPOED_TAG_AC_SYSTEM_ERROR;
+import static org.onlab.packet.PPPoEDTag.PPPOED_TAG_GENERIC_ERROR;
+import static org.onlab.packet.PPPoEDTag.PPPOED_TAG_SERVICE_NAME_ERROR;
+import static org.onlab.packet.PPPoEDTag.PPPOED_TAG_VENDOR_SPECIFIC;
+
+import static org.onlab.util.Tools.getIntegerProperty;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.net.config.basics.SubjectFactories.APP_SUBJECT_FACTORY;
+
+import static org.opencord.pppoeagent.impl.OsgiPropertyConstants.PPPOE_MAX_MTU;
+import static org.opencord.pppoeagent.impl.OsgiPropertyConstants.PPPOE_MAX_MTU_DEFAULT;
+import static org.opencord.pppoeagent.impl.OsgiPropertyConstants.ENABLE_CIRCUIT_ID_VALIDATION;
+import static org.opencord.pppoeagent.impl.OsgiPropertyConstants.ENABLE_CIRCUIT_ID_VALIDATION_DEFAULT;
+import static org.opencord.pppoeagent.impl.OsgiPropertyConstants.PACKET_PROCESSOR_THREADS;
+import static org.opencord.pppoeagent.impl.OsgiPropertyConstants.PACKET_PROCESSOR_THREADS_DEFAULT;
+
+/**
+ * PPPoE Intermediate Agent application.
+ */
+@Component(immediate = true,
+property = {
+ PPPOE_MAX_MTU + ":Integer=" + PPPOE_MAX_MTU_DEFAULT,
+ ENABLE_CIRCUIT_ID_VALIDATION + ":Boolean=" + ENABLE_CIRCUIT_ID_VALIDATION_DEFAULT,
+ PACKET_PROCESSOR_THREADS + ":Integer=" + PACKET_PROCESSOR_THREADS_DEFAULT,
+})
+public class PppoeAgent
+ extends AbstractListenerManager<PppoeAgentEvent, PppoeAgentListener>
+ implements PppoeAgentService {
+ private static final String APP_NAME = "org.opencord.pppoeagent";
+ private static final short QINQ_VID_NONE = (short) -1;
+
+ private final InternalConfigListener cfgListener = new InternalConfigListener();
+ private final Set<ConfigFactory> factories = ImmutableSet.of(
+ new ConfigFactory<>(APP_SUBJECT_FACTORY, PppoeAgentConfig.class, "pppoeagent") {
+ @Override
+ public PppoeAgentConfig createConfig() {
+ return new PppoeAgentConfig();
+ }
+ }
+ );
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected CoreService coreService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected NetworkConfigRegistry cfgService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected PacketService packetService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected ComponentConfigService componentConfigService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected SadisService sadisService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected DeviceService deviceService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected MastershipService mastershipService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected StorageService storageService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected PppoeAgentCountersStore pppoeAgentCounters;
+
+ // OSGi Properties
+ protected int pppoeMaxMtu = PPPOE_MAX_MTU_DEFAULT;
+ protected boolean enableCircuitIdValidation = ENABLE_CIRCUIT_ID_VALIDATION_DEFAULT;
+ protected int packetProcessorThreads = PACKET_PROCESSOR_THREADS_DEFAULT;
+
+ private ApplicationId appId;
+ private InnerDeviceListener deviceListener = new InnerDeviceListener();
+ private InnerMastershipListener changeListener = new InnerMastershipListener();
+ private PppoeAgentPacketProcessor pppoeAgentPacketProcessor = new PppoeAgentPacketProcessor();
+ protected BaseInformationService<SubscriberAndDeviceInformation> subsService;
+ private PppoeAgentStoreDelegate delegate = new InnerPppoeAgentStoreDelegate();
+
+ Set<ConnectPoint> pppoeConnectPoints;
+ protected AtomicReference<ConnectPoint> pppoeServerConnectPoint = new AtomicReference<>();
+ protected boolean useOltUplink = false;
+
+ static ConsistentMap<MacAddress, PppoeSessionInfo> sessionsMap;
+
+ @Override
+ public Map<MacAddress, PppoeSessionInfo> getSessionsMap() {
+ return ImmutableMap.copyOf(sessionsMap.asJavaMap());
+ }
+
+ @Override
+ public void clearSessionsMap() {
+ sessionsMap.clear();
+ }
+
+ private final ArrayList<CircuitIdFieldName> circuitIdfields = new ArrayList<>(Arrays.asList(
+ CircuitIdFieldName.AcessNodeIdentifier,
+ CircuitIdFieldName.Slot,
+ CircuitIdFieldName.Port,
+ CircuitIdFieldName.OnuSerialNumber));
+
+ protected ExecutorService packetProcessorExecutor;
+
+ @Activate
+ protected void activate(ComponentContext context) {
+ eventDispatcher.addSink(PppoeAgentEvent.class, listenerRegistry);
+
+ appId = coreService.registerApplication(APP_NAME);
+ cfgService.addListener(cfgListener);
+ componentConfigService.registerProperties(getClass());
+
+ KryoNamespace serializer = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .register(PppoeSessionInfo.class)
+ .register(MacAddress.class)
+ .register(SubscriberAndDeviceInformation.class)
+ .register(UniTagInformation.class)
+ .register(ConnectPoint.class)
+ .build();
+
+ sessionsMap = storageService.<MacAddress, PppoeSessionInfo>consistentMapBuilder()
+ .withName("pppoeagent-sessions")
+ .withSerializer(Serializer.using(serializer))
+ .withApplicationId(appId)
+ .build();
+
+ factories.forEach(cfgService::registerConfigFactory);
+ deviceService.addListener(deviceListener);
+ subsService = sadisService.getSubscriberInfoService();
+ mastershipService.addListener(changeListener);
+ pppoeAgentCounters.setDelegate(delegate);
+ updateConfig();
+ packetService.addProcessor(pppoeAgentPacketProcessor, PacketProcessor.director(0));
+ if (context != null) {
+ modified(context);
+ }
+ log.info("PPPoE Intermediate Agent Started");
+ }
+
+ @Deactivate
+ protected void deactivate() {
+ cfgService.removeListener(cfgListener);
+ factories.forEach(cfgService::unregisterConfigFactory);
+ packetService.removeProcessor(pppoeAgentPacketProcessor);
+ packetProcessorExecutor.shutdown();
+ componentConfigService.unregisterProperties(getClass(), false);
+ deviceService.removeListener(deviceListener);
+ eventDispatcher.removeSink(PppoeAgentEvent.class);
+ pppoeAgentCounters.unsetDelegate(delegate);
+ log.info("PPPoE Intermediate Agent Stopped");
+ }
+
+ private void updateConfig() {
+ PppoeAgentConfig cfg = cfgService.getConfig(appId, PppoeAgentConfig.class);
+ if (cfg == null) {
+ log.warn("PPPoE server info not available");
+ return;
+ }
+
+ synchronized (this) {
+ pppoeConnectPoints = Sets.newConcurrentHashSet(cfg.getPppoeServerConnectPoint());
+ }
+ useOltUplink = cfg.getUseOltUplinkForServerPktInOut();
+ }
+
+ /**
+ * Returns whether the passed port is the uplink port of the olt device.
+ */
+ private boolean isUplinkPortOfOlt(DeviceId dId, Port p) {
+ log.debug("isUplinkPortOfOlt: DeviceId: {} Port: {}", dId, p);
+ if (!mastershipService.isLocalMaster(dId)) {
+ return false;
+ }
+
+ Device d = deviceService.getDevice(dId);
+ SubscriberAndDeviceInformation deviceInfo = subsService.get(d.serialNumber());
+ if (deviceInfo != null) {
+ return (deviceInfo.uplinkPort() == p.number().toLong());
+ }
+
+ return false;
+ }
+
+ /**
+ * Returns the connectPoint which is the uplink port of the OLT.
+ */
+ private ConnectPoint getUplinkConnectPointOfOlt(DeviceId dId) {
+ Device d = deviceService.getDevice(dId);
+ SubscriberAndDeviceInformation deviceInfo = subsService.get(d.serialNumber());
+ log.debug("getUplinkConnectPointOfOlt DeviceId: {} devInfo: {}", dId, deviceInfo);
+ if (deviceInfo != null) {
+ PortNumber pNum = PortNumber.portNumber(deviceInfo.uplinkPort());
+ Port port = deviceService.getPort(d.id(), pNum);
+ if (port != null) {
+ return new ConnectPoint(d.id(), pNum);
+ }
+ }
+ return null;
+ }
+
+ @Modified
+ protected void modified(ComponentContext context) {
+ Dictionary<?, ?> properties = context.getProperties();
+
+ Integer newPppoeMaxMtu = getIntegerProperty(properties, PPPOE_MAX_MTU);
+ if (newPppoeMaxMtu != null) {
+ if (newPppoeMaxMtu != pppoeMaxMtu && newPppoeMaxMtu >= 0) {
+ log.info("PPPPOE MTU modified from {} to {}", pppoeMaxMtu, newPppoeMaxMtu);
+ pppoeMaxMtu = newPppoeMaxMtu;
+ } else if (newPppoeMaxMtu < 0) {
+ log.error("Invalid newPppoeMaxMtu : {}, defaulting to 1492", newPppoeMaxMtu);
+ pppoeMaxMtu = PPPOE_MAX_MTU_DEFAULT;
+ }
+ }
+
+ Boolean newEnableCircuitIdValidation = Tools.isPropertyEnabled(properties, ENABLE_CIRCUIT_ID_VALIDATION);
+ if (newEnableCircuitIdValidation != null) {
+ if (enableCircuitIdValidation != newEnableCircuitIdValidation) {
+ log.info("Property enableCircuitIdValidation modified from {} to {}",
+ enableCircuitIdValidation, newEnableCircuitIdValidation);
+ enableCircuitIdValidation = newEnableCircuitIdValidation;
+ }
+ }
+
+ String s = Tools.get(properties, PACKET_PROCESSOR_THREADS);
+
+ int oldpacketProcessorThreads = packetProcessorThreads;
+ packetProcessorThreads = Strings.isNullOrEmpty(s) ? oldpacketProcessorThreads
+ : Integer.parseInt(s.trim());
+ if (packetProcessorExecutor == null || oldpacketProcessorThreads != packetProcessorThreads) {
+ if (packetProcessorExecutor != null) {
+ packetProcessorExecutor.shutdown();
+ }
+ packetProcessorExecutor = newFixedThreadPool(packetProcessorThreads,
+ groupedThreads("onos/pppoe",
+ "pppoe-packet-%d", log));
+ }
+ }
+
+ /**
+ * Selects a connect point through an available device for which it is the master.
+ */
+ private void selectServerConnectPoint() {
+ synchronized (this) {
+ pppoeServerConnectPoint.set(null);
+ if (pppoeConnectPoints != null) {
+ // find a connect point through a device for which we are master
+ for (ConnectPoint cp: pppoeConnectPoints) {
+ if (mastershipService.isLocalMaster(cp.deviceId())) {
+ if (deviceService.isAvailable(cp.deviceId())) {
+ pppoeServerConnectPoint.set(cp);
+ }
+ log.info("PPPOE connectPoint selected is {}", cp);
+ break;
+ }
+ }
+ }
+ log.info("PPPOE Server connectPoint is {}", pppoeServerConnectPoint.get());
+ if (pppoeServerConnectPoint.get() == null) {
+ log.error("Master of none, can't relay PPPOE messages to server");
+ }
+ }
+ }
+
+ private SubscriberAndDeviceInformation getSubscriber(ConnectPoint cp) {
+ Port p = deviceService.getPort(cp);
+ String subscriberId = p.annotations().value(AnnotationKeys.PORT_NAME);
+ return subsService.get(subscriberId);
+ }
+
+ private SubscriberAndDeviceInformation getDevice(PacketContext context) {
+ String serialNo = deviceService.getDevice(context.inPacket().
+ receivedFrom().deviceId()).serialNumber();
+
+ return subsService.get(serialNo);
+ }
+
+ private UniTagInformation getUnitagInformationFromPacketContext(PacketContext context,
+ SubscriberAndDeviceInformation sub) {
+ return sub.uniTagList()
+ .stream()
+ .filter(u -> u.getPonCTag().toShort() == context.inPacket().parsed().getVlanID())
+ .findFirst()
+ .orElse(null);
+ }
+
+ private boolean removeSessionsByConnectPoint(ConnectPoint cp) {
+ boolean removed = false;
+ for (MacAddress key : sessionsMap.keySet()) {
+ PppoeSessionInfo entry = sessionsMap.asJavaMap().get(key);
+ if (entry.getClientCp().equals(cp)) {
+ sessionsMap.remove(key);
+ removed = true;
+ }
+ }
+ return removed;
+ }
+
+ private boolean removeSessionsByDevice(DeviceId devid) {
+ boolean removed = false;
+ for (MacAddress key : sessionsMap.keySet()) {
+ PppoeSessionInfo entry = sessionsMap.asJavaMap().get(key);
+ if (entry.getClientCp().deviceId().equals(devid)) {
+ sessionsMap.remove(key);
+ removed = true;
+ }
+ }
+ return removed;
+ }
+
+ private class PppoeAgentPacketProcessor implements PacketProcessor {
+ @Override
+ public void process(PacketContext context) {
+ packetProcessorExecutor.execute(() -> {
+ processInternal(context);
+ });
+ }
+
+ private void processInternal(PacketContext context) {
+ Ethernet packet = (Ethernet) context.inPacket().parsed().clone();
+ if (packet.getEtherType() == Ethernet.TYPE_PPPOED) {
+ processPppoedPacket(context, packet);
+ }
+ }
+
+ private void processPppoedPacket(PacketContext context, Ethernet packet) {
+ PPPoED pppoed = (PPPoED) packet.getPayload();
+ if (pppoed == null) {
+ log.warn("PPPoED payload is null");
+ return;
+ }
+
+ final byte pppoedCode = pppoed.getCode();
+ final short sessionId = pppoed.getSessionId();
+ final MacAddress clientMacAddress;
+ final ConnectPoint packetCp = context.inPacket().receivedFrom();
+ boolean serverMessage = false;
+
+ // Get the client MAC address
+ switch (pppoedCode) {
+ case PPPOED_CODE_PADT: {
+ if (sessionsMap.containsKey(packet.getDestinationMAC())) {
+ clientMacAddress = packet.getDestinationMAC();
+ serverMessage = true;
+ } else if (sessionsMap.containsKey(packet.getSourceMAC())) {
+ clientMacAddress = packet.getSourceMAC();
+ } else {
+ // In the unlikely case of receiving a PADT without an existing session
+ log.warn("PADT received for unknown session. Dropping packet.");
+ return;
+ }
+ break;
+ }
+ case PPPOED_CODE_PADI:
+ case PPPOED_CODE_PADR: {
+ clientMacAddress = packet.getSourceMAC();
+ break;
+ }
+ default: {
+ clientMacAddress = packet.getDestinationMAC();
+ serverMessage = true;
+ break;
+ }
+ }
+
+ SubscriberAndDeviceInformation subsInfo;
+ if (serverMessage) {
+ if (!sessionsMap.containsKey(clientMacAddress)) {
+ log.error("PPPoED message received from server without an existing session. Message not relayed.");
+ return;
+ } else {
+ PppoeSessionInfo sessInfo = sessionsMap.get(clientMacAddress).value();
+ subsInfo = getSubscriber(sessInfo.getClientCp());
+ }
+ } else {
+ subsInfo = getSubscriber(packetCp);
+ }
+
+ if (subsInfo == null) {
+ log.error("No Sadis info for subscriber on connect point {}. Message not relayed.", packetCp);
+ return;
+ }
+
+ log.trace("{} received from {} at {} with client mac: {}",
+ PPPoED.Type.getTypeByValue(pppoedCode).toString(),
+ serverMessage ? "server" : "client", packetCp, clientMacAddress);
+
+ if (log.isTraceEnabled()) {
+ log.trace("PPPoED message received from {} at {} {}",
+ serverMessage ? "server" : "client", packetCp, packet);
+ }
+
+ // In case of PADI, force the removal of the previous session entry
+ if ((pppoedCode == PPPOED_CODE_PADI) && (sessionsMap.containsKey(clientMacAddress))) {
+ log.trace("PADI received from MAC: {} with existing session data. Removing the existing data.",
+ clientMacAddress.toString());
+ sessionsMap.remove(clientMacAddress);
+ }
+
+ // Fill the session map entry
+ PppoeSessionInfo sessionInfo;
+ if (!sessionsMap.containsKey(clientMacAddress)) {
+ if (!serverMessage) {
+ ConnectPoint serverCp = getServerConnectPoint(packetCp.deviceId());
+ SubscriberAndDeviceInformation subscriber = getSubscriber(packetCp);
+ sessionInfo = new PppoeSessionInfo(packetCp, serverCp, pppoedCode,
+ sessionId, subscriber, clientMacAddress);
+ sessionsMap.put(clientMacAddress, sessionInfo);
+ } else {
+ // This case was already covered.
+ return;
+ }
+ } else {
+ sessionInfo = sessionsMap.get(clientMacAddress).value();
+ }
+
+ switch (pppoedCode) {
+ case PPPOED_CODE_PADI:
+ case PPPOED_CODE_PADR:
+ updatePppoeAgentCountersStore(sessionInfo.getSubscriber(),
+ pppoedCode == PPPOED_CODE_PADI ? PppoeAgentCounterNames.PADI : PppoeAgentCounterNames.PADR);
+
+ Ethernet padir = processPacketFromClient(context, packet, pppoed, sessionInfo, clientMacAddress);
+ if (padir != null) {
+ if (padir.serialize().length <= pppoeMaxMtu) {
+ forwardPacketToServer(padir, sessionInfo);
+ } else {
+ log.debug("MTU message size: {} exceeded configured pppoeMaxMtu: {}. Dropping Packet.",
+ padir.serialize().length, pppoeMaxMtu);
+ forwardPacketToClient(errorToClient(packet, pppoed, "MTU message size exceeded"),
+ sessionInfo, clientMacAddress);
+ updatePppoeAgentCountersStore(sessionInfo.getSubscriber(),
+ PppoeAgentCounterNames.MTU_EXCEEDED);
+ }
+ }
+ break;
+ case PPPOED_CODE_PADO:
+ case PPPOED_CODE_PADS:
+ updatePppoeAgentCountersStore(sessionInfo.getSubscriber(),
+ pppoedCode == PPPOED_CODE_PADO ? PppoeAgentCounterNames.PADO : PppoeAgentCounterNames.PADS);
+ Ethernet pados = processPacketFromServer(packet, pppoed, sessionInfo, clientMacAddress);
+ if (pados != null) {
+ forwardPacketToClient(pados, sessionInfo, clientMacAddress);
+ }
+ break;
+ case PPPOED_CODE_PADT:
+ if (serverMessage) {
+ updatePppoeAgentCountersStore(sessionInfo.getSubscriber(),
+ PppoeAgentCounterNames.PADT_FROM_SERVER);
+ forwardPacketToClient(packet, sessionInfo, clientMacAddress);
+ } else {
+ updatePppoeAgentCountersStore(sessionInfo.getSubscriber(),
+ PppoeAgentCounterNames.PADT_FROM_CLIENT);
+ forwardPacketToServer(packet, sessionInfo);
+ }
+
+ String reason = "";
+ PPPoEDTag genericErrorTag = pppoed.getTags()
+ .stream()
+ .filter(tag -> tag.getType() == PPPOED_TAG_GENERIC_ERROR)
+ .findFirst()
+ .orElse(null);
+
+ if (genericErrorTag != null) {
+ reason = new String(genericErrorTag.getValue(), StandardCharsets.UTF_8);
+ }
+ log.debug("PADT sessionId:{} client MAC:{} Terminate reason:{}.",
+ Integer.toHexString(sessionId & 0xFFFF), clientMacAddress, reason);
+
+ boolean knownSessionId = sessionInfo.getSessionId() == sessionId;
+ if (knownSessionId) {
+ PppoeSessionInfo removedSessionInfo = Versioned
+ .valueOrNull(sessionsMap.remove(clientMacAddress));
+ if (removedSessionInfo != null) {
+ post(new PppoeAgentEvent(PppoeAgentEvent.Type.TERMINATE, removedSessionInfo,
+ packetCp, clientMacAddress, reason));
+ }
+ } else {
+ log.warn("PADT received for a known MAC address but for unknown session.");
+ }
+ break;
+ default:
+ break;
+ }
+ }
+
+ private Ethernet processPacketFromClient(PacketContext context, Ethernet ethernetPacket, PPPoED pppoed,
+ PppoeSessionInfo sessionInfo, MacAddress clientMacAddress) {
+ byte pppoedCode = pppoed.getCode();
+
+ sessionInfo.setPacketCode(pppoedCode);
+ sessionsMap.put(clientMacAddress, sessionInfo);
+
+ // Update Counters
+ for (PPPoEDTag tag : pppoed.getTags()) {
+ if (tag.getType() == PPPOED_TAG_GENERIC_ERROR) {
+ updatePppoeAgentCountersStore(sessionInfo.getSubscriber(),
+ PppoeAgentCounterNames.GENERIC_ERROR_FROM_CLIENT);
+ break;
+ }
+ }
+
+ Ethernet ethFwd = ethernetPacket;
+
+ // If this is a PADI packet, there'll be a START event.
+ if (pppoedCode == PPPOED_CODE_PADI) {
+ post(new PppoeAgentEvent(PppoeAgentEvent.Type.START, sessionInfo, sessionInfo.getClientCp(),
+ clientMacAddress));
+ }
+
+ // Creates the vendor specific tag.
+ String circuitId = getCircuitId(sessionInfo.getClientCp());
+ if (circuitId == null) {
+ log.error("Failed to build circuid-id for client on connect point {}. Dropping packet.",
+ sessionInfo.getClientCp());
+ return null;
+ }
+
+ // Checks whether the circuit-id is valid, if it's not it drops the packet.
+ if (!isCircuitIdValid(circuitId, sessionInfo.getSubscriber())) {
+ log.warn("Invalid circuit ID, dropping packet.");
+ PppoeAgentEvent invalidCidEvent = new PppoeAgentEvent(PppoeAgentEvent.Type.INVALID_CID, sessionInfo,
+ context.inPacket().receivedFrom(), clientMacAddress);
+ post(invalidCidEvent);
+ return null;
+ }
+
+ String remoteId = sessionInfo.getSubscriber().remoteId();
+ byte[] vendorSpecificTag = new PPPoEDVendorSpecificTag(circuitId, remoteId).toByteArray();
+
+ // According to TR-101, R-149 (by Broadband Forum), agent must REPLACE vendor-specific tag that may come
+ // from client message with its own tag.
+ // The following block ensures that agent removes any previous vendor-specific tag.
+ List<PPPoEDTag> originalTags = pppoed.getTags();
+ if (originalTags != null) {
+ PPPoEDTag originalVendorSpecificTag = originalTags.stream()
+ .filter(tag -> tag.getType() == PPPOED_TAG_VENDOR_SPECIFIC)
+ .findFirst()
+ .orElse(null);
+
+ if (originalVendorSpecificTag != null) {
+ int tagToRemoveLength = originalVendorSpecificTag.getLength();
+ originalTags.removeIf(tag -> tag.getType() == PPPOED_TAG_VENDOR_SPECIFIC);
+ pppoed.setPayloadLength((short) (pppoed.getPayloadLength() - tagToRemoveLength));
+ }
+ }
+
+ pppoed.setTag(PPPOED_TAG_VENDOR_SPECIFIC, vendorSpecificTag);
+
+ ethFwd.setPayload(pppoed);
+ ethFwd.setQinQTPID(Ethernet.TYPE_VLAN);
+
+ UniTagInformation uniTagInformation = getUnitagInformationFromPacketContext(context,
+ sessionInfo.getSubscriber());
+ if (uniTagInformation == null) {
+ log.warn("Missing service information for connectPoint {} / cTag {}",
+ context.inPacket().receivedFrom(), context.inPacket().parsed().getVlanID());
+ return null;
+ }
+ ethFwd.setQinQVID(uniTagInformation.getPonSTag().toShort());
+ ethFwd.setPad(true);
+ return ethFwd;
+ }
+
+ private Ethernet processPacketFromServer(Ethernet ethernetPacket, PPPoED pppoed,
+ PppoeSessionInfo sessionInfo, MacAddress clientMacAddress) {
+ // Update counters
+ List<PPPoEDTag> tags = pppoed.getTags();
+ for (PPPoEDTag tag : tags) {
+ switch (tag.getType()) {
+ case PPPOED_TAG_GENERIC_ERROR:
+ updatePppoeAgentCountersStore(sessionInfo.getSubscriber(),
+ PppoeAgentCounterNames.GENERIC_ERROR_FROM_SERVER);
+ break;
+ case PPPOED_TAG_SERVICE_NAME_ERROR:
+ updatePppoeAgentCountersStore(sessionInfo.getSubscriber(),
+ PppoeAgentCounterNames.SERVICE_NAME_ERROR);
+ break;
+ case PPPOED_TAG_AC_SYSTEM_ERROR:
+ updatePppoeAgentCountersStore(sessionInfo.getSubscriber(),
+ PppoeAgentCounterNames.AC_SYSTEM_ERROR);
+ break;
+ default:
+ break;
+ }
+ }
+
+ byte pppoedCode = pppoed.getCode();
+
+ if (pppoedCode == PPPOED_CODE_PADS) {
+ log.debug("PADS sessionId:{} client MAC:{}", Integer.toHexString(pppoed.getSessionId() & 0xFFFF),
+ clientMacAddress);
+ sessionInfo.setSessionId(pppoed.getSessionId());
+ }
+ sessionInfo.setPacketCode(pppoedCode);
+ sessionsMap.put(clientMacAddress, sessionInfo);
+
+ PppoeAgentEvent.Type eventType = pppoedCode == PPPOED_CODE_PADS ?
+ PppoeAgentEvent.Type.SESSION_ESTABLISHED :
+ PppoeAgentEvent.Type.NEGOTIATION;
+
+ post(new PppoeAgentEvent(eventType, sessionInfo, sessionInfo.getClientCp(), clientMacAddress));
+
+ ethernetPacket.setQinQVID(QINQ_VID_NONE);
+ ethernetPacket.setPad(true);
+ return ethernetPacket;
+ }
+
+ private void updatePppoeAgentCountersStore(SubscriberAndDeviceInformation sub,
+ PppoeAgentCounterNames counterType) {
+ // Update global counter stats
+ pppoeAgentCounters.incrementCounter(PppoeAgentEvent.GLOBAL_COUNTER, counterType);
+ if (sub == null) {
+ log.warn("Counter not updated as subscriber info not found.");
+ } else {
+ // Update subscriber counter stats
+ pppoeAgentCounters.incrementCounter(sub.id(), counterType);
+ }
+ }
+
+ private String getCircuitId(ConnectPoint cp) {
+ return new CircuitIdBuilder()
+ .setConnectPoint(cp)
+ .setDeviceService(deviceService)
+ .setSubsService(subsService)
+ .setCircuitIdConfig(circuitIdfields)
+ .addCustomSeparator(CircuitIdFieldName.AcessNodeIdentifier, " ")
+ .addCustomSeparator(CircuitIdFieldName.Port, ":")
+ .build();
+ }
+
+ protected ConnectPoint getServerConnectPoint(DeviceId deviceId) {
+ ConnectPoint serverCp;
+ if (!useOltUplink) {
+ serverCp = pppoeServerConnectPoint.get();
+ } else {
+ serverCp = getUplinkConnectPointOfOlt(deviceId);
+ }
+ return serverCp;
+ }
+
+ private boolean isCircuitIdValid(String cId, SubscriberAndDeviceInformation entry) {
+ if (!enableCircuitIdValidation) {
+ log.debug("Circuit ID validation is disabled.");
+ return true;
+ }
+
+ if (entry == null) {
+ log.error("SubscriberAndDeviceInformation cannot be null.");
+ return false;
+ }
+
+ if (entry.circuitId() == null || entry.circuitId().isEmpty()) {
+ log.debug("Circuit ID not configured in SADIS entry. No check is done.");
+ return true;
+ } else {
+ if (cId.equals(entry.circuitId())) {
+ log.info("Circuit ID in packet: {} matched the configured entry on SADIS.", cId);
+ return true;
+ } else {
+ log.warn("Circuit ID in packet: {} did not match the configured entry on SADIS: {}.",
+ cId, entry.circuitId());
+ return false;
+ }
+ }
+ }
+
+ private void forwardPacketToServer(Ethernet packet, PppoeSessionInfo sessionInfo) {
+ ConnectPoint toSendTo = sessionInfo.getServerCp();
+ if (toSendTo != null) {
+ log.info("Sending PPPOE packet to server at {}", toSendTo);
+ TrafficTreatment t = DefaultTrafficTreatment.builder().setOutput(toSendTo.port()).build();
+ OutboundPacket o = new DefaultOutboundPacket(toSendTo.deviceId(), t,
+ ByteBuffer.wrap(packet.serialize()));
+ if (log.isTraceEnabled()) {
+ log.trace("Relaying packet to pppoe server at {} {}", toSendTo, packet);
+ }
+ packetService.emit(o);
+
+ updatePppoeAgentCountersStore(sessionInfo.getSubscriber(),
+ PppoeAgentCounterNames.PPPOED_PACKETS_TO_SERVER);
+ } else {
+ log.error("No connect point to send msg to PPPOE Server");
+ }
+ }
+
+ private void forwardPacketToClient(Ethernet packet, PppoeSessionInfo sessionInfo, MacAddress clientMacAddress) {
+ ConnectPoint subCp = sessionInfo.getClientCp();
+ if (subCp == null) {
+ log.error("Dropping PPPOE packet, can't find a connectpoint for MAC {}", clientMacAddress);
+ return;
+ }
+
+ log.info("Sending PPPOE packet to client at {}", subCp);
+ TrafficTreatment t = DefaultTrafficTreatment.builder()
+ .setOutput(subCp.port()).build();
+ OutboundPacket o = new DefaultOutboundPacket(
+ subCp.deviceId(), t, ByteBuffer.wrap(packet.serialize()));
+ if (log.isTraceEnabled()) {
+ log.trace("Relaying packet to pppoe client at {} {}", subCp, packet);
+ }
+
+ packetService.emit(o);
+
+ updatePppoeAgentCountersStore(sessionInfo.getSubscriber(),
+ PppoeAgentCounterNames.PPPOED_PACKETS_FROM_SERVER);
+ }
+
+ private Ethernet errorToClient(Ethernet packet, PPPoED p, String errString) {
+ PPPoED err = new PPPoED();
+ err.setVersion(p.getVersion());
+ err.setType(p.getType());
+ switch (p.getCode()) {
+ case PPPOED_CODE_PADI:
+ err.setCode(PPPOED_CODE_PADO);
+ break;
+ case PPPOED_CODE_PADR:
+ err.setCode(PPPOED_CODE_PADS);
+ break;
+ default:
+ break;
+ }
+ err.setCode(p.getCode());
+ err.setSessionId(p.getSessionId());
+ err.setTag(PPPOED_TAG_GENERIC_ERROR, errString.getBytes(StandardCharsets.UTF_8));
+
+ Ethernet ethPacket = new Ethernet();
+ ethPacket.setPayload(err);
+ ethPacket.setSourceMACAddress(packet.getDestinationMACAddress());
+ ethPacket.setDestinationMACAddress(packet.getSourceMACAddress());
+ ethPacket.setQinQVID(QINQ_VID_NONE);
+ ethPacket.setPad(true);
+
+ return ethPacket;
+ }
+ }
+
+ private class InternalConfigListener implements NetworkConfigListener {
+ @Override
+ public void event(NetworkConfigEvent event) {
+
+ if ((event.type() == NetworkConfigEvent.Type.CONFIG_ADDED ||
+ event.type() == NetworkConfigEvent.Type.CONFIG_UPDATED) &&
+ event.configClass().equals(PppoeAgentConfig.class)) {
+ updateConfig();
+ log.info("Reconfigured");
+ }
+ }
+ }
+
+ /**
+ * Handles Mastership changes for the devices which connect to the PPPOE server.
+ */
+ private class InnerMastershipListener implements MastershipListener {
+ @Override
+ public void event(MastershipEvent event) {
+ if (!useOltUplink) {
+ if (pppoeServerConnectPoint.get() != null &&
+ pppoeServerConnectPoint.get().deviceId().equals(event.subject())) {
+ log.trace("Mastership Event recevived for {}", event.subject());
+ // mastership of the device for our connect point has changed, reselect
+ selectServerConnectPoint();
+ }
+ }
+ }
+ }
+
+ private class InnerDeviceListener implements DeviceListener {
+ @Override
+ public void event(DeviceEvent event) {
+ DeviceId devId = event.subject().id();
+
+ if (log.isTraceEnabled() && !event.type().equals(DeviceEvent.Type.PORT_STATS_UPDATED)) {
+ log.trace("Device Event received for {} event {}", event.subject(), event.type());
+ }
+
+ // Handle events from any other device
+ switch (event.type()) {
+ case PORT_UPDATED:
+ if (!event.port().isEnabled()) {
+ ConnectPoint cp = new ConnectPoint(devId, event.port().number());
+ removeSessionsByConnectPoint(cp);
+ }
+ break;
+ case PORT_REMOVED:
+ // Remove all entries related to this port from sessions map
+ ConnectPoint cp = new ConnectPoint(devId, event.port().number());
+ removeSessionsByConnectPoint(cp);
+ break;
+ case DEVICE_REMOVED:
+ // Remove all entries related to this device from sessions map
+ removeSessionsByDevice(devId);
+ break;
+ default:
+ break;
+ }
+ }
+ }
+
+ private class InnerPppoeAgentStoreDelegate implements PppoeAgentStoreDelegate {
+ @Override
+ public void notify(PppoeAgentEvent event) {
+ if (event.type().equals(PppoeAgentEvent.Type.STATS_UPDATE)) {
+ PppoeAgentEvent toPost = event;
+ if (event.getSubscriberId() != null) {
+ // infuse the event with the allocation info before posting
+ PppoeSessionInfo info = Versioned.valueOrNull(
+ sessionsMap.stream().filter(entry -> event.getSubscriberId()
+ .equals(entry.getValue().value().getSubscriber().id()))
+ .map(Map.Entry::getValue)
+ .findFirst()
+ .orElse(null));
+ if (info == null) {
+ log.debug("Not handling STATS_UPDATE event for session that no longer exists. {}.", event);
+ return;
+ }
+
+ toPost = new PppoeAgentEvent(event.type(), info, event.getCounterName(), event.getCounterValue(),
+ info.getClientMac(), event.getSubscriberId());
+ }
+ post(toPost);
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentConfig.java b/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentConfig.java
new file mode 100644
index 0000000..5d14c76
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentConfig.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.pppoeagent.impl;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.google.common.collect.ImmutableSet;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.config.Config;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class PppoeAgentConfig extends Config<ApplicationId> {
+ private static final String PPPOE_CONNECT_POINTS = "pppoeServerConnectPoints";
+ private static final String USE_OLT_ULPORT_FOR_PKT_INOUT = "useOltUplinkForServerPktInOut";
+
+ protected static final Boolean DEFAULT_USE_OLT_ULPORT_FOR_PKT_INOUT = true;
+
+ @Override
+ public boolean isValid() {
+ return hasOnlyFields(PPPOE_CONNECT_POINTS, USE_OLT_ULPORT_FOR_PKT_INOUT);
+ }
+
+ /**
+ * Returns whether the app would use the uplink port of OLT for sending/receving
+ * messages to/from the server.
+ *
+ * @return true if OLT uplink port is to be used, false otherwise
+ */
+ public boolean getUseOltUplinkForServerPktInOut() {
+ if (object == null) {
+ return DEFAULT_USE_OLT_ULPORT_FOR_PKT_INOUT;
+ }
+ if (!object.has(USE_OLT_ULPORT_FOR_PKT_INOUT)) {
+ return DEFAULT_USE_OLT_ULPORT_FOR_PKT_INOUT;
+ }
+ return object.path(USE_OLT_ULPORT_FOR_PKT_INOUT).asBoolean();
+ }
+
+ /**
+ * Returns the pppoe server connect points.
+ *
+ * @return pppoe server connect points
+ */
+ public Set<ConnectPoint> getPppoeServerConnectPoint() {
+ if (object == null) {
+ return new HashSet<ConnectPoint>();
+ }
+
+ if (!object.has(PPPOE_CONNECT_POINTS)) {
+ return ImmutableSet.of();
+ }
+
+ ImmutableSet.Builder<ConnectPoint> builder = ImmutableSet.builder();
+ ArrayNode arrayNode = (ArrayNode) object.path(PPPOE_CONNECT_POINTS);
+ for (JsonNode jsonNode : arrayNode) {
+ String portName = jsonNode.asText(null);
+ if (portName == null) {
+ return null;
+ }
+ try {
+ builder.add(ConnectPoint.deviceConnectPoint(portName));
+ } catch (IllegalArgumentException e) {
+ return null;
+ }
+ }
+ return builder.build();
+ }
+
+
+}
diff --git a/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentCounterNames.java b/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentCounterNames.java
new file mode 100644
index 0000000..0d58078
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentCounterNames.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.opencord.pppoeagent.impl;
+
+import com.google.common.collect.ImmutableSet;
+
+import java.util.Set;
+
+/**
+ * Represents PPPoE agent counters type.
+ */
+public enum PppoeAgentCounterNames {
+ /**
+ * Number of PADI messages received from client.
+ */
+ PADI,
+ /**
+ * Number of PADO messages received from server.
+ */
+ PADO,
+ /**
+ * Number of PADR messages received from client.
+ */
+ PADR,
+ /**
+ * Number of PADS messages received from server.
+ */
+ PADS,
+ /**
+ * Number of PADT messages received from server.
+ */
+ PADT_FROM_SERVER,
+ /**
+ * Number of PADT messages received from client.
+ */
+ PADT_FROM_CLIENT,
+ /**
+ * Number of PPPoED messages sent to server.
+ */
+ PPPOED_PACKETS_TO_SERVER,
+ /**
+ * Number of PPPoED messages received from server.
+ */
+ PPPOED_PACKETS_FROM_SERVER,
+ /**
+ * Number of MTU Exceeded errors generated by the PPPoED agent.
+ */
+ MTU_EXCEEDED,
+ /**
+ * Number of Generic Errors received from server.
+ */
+ GENERIC_ERROR_FROM_SERVER,
+ /**
+ * Number of Generic Errors received from client.
+ */
+ GENERIC_ERROR_FROM_CLIENT,
+ /**
+ * Number of ServiceName Errors received from server.
+ */
+ SERVICE_NAME_ERROR,
+ /**
+ * Number of AC-System Errors received from server.
+ */
+ AC_SYSTEM_ERROR;
+
+ /**
+ * Supported types of PPPoED agent counters.
+ */
+ public static final Set<PppoeAgentCounterNames> SUPPORTED_COUNTERS = ImmutableSet.of(
+ PppoeAgentCounterNames.PADI, PppoeAgentCounterNames.PADO,
+ PppoeAgentCounterNames.PADR, PppoeAgentCounterNames.PADS,
+ PppoeAgentCounterNames.PADT_FROM_SERVER, PppoeAgentCounterNames.PADT_FROM_CLIENT,
+ PppoeAgentCounterNames.PPPOED_PACKETS_TO_SERVER, PppoeAgentCounterNames.PPPOED_PACKETS_FROM_SERVER,
+ PppoeAgentCounterNames.MTU_EXCEEDED, PppoeAgentCounterNames.GENERIC_ERROR_FROM_SERVER,
+ PppoeAgentCounterNames.GENERIC_ERROR_FROM_CLIENT, PppoeAgentCounterNames.SERVICE_NAME_ERROR,
+ PppoeAgentCounterNames.AC_SYSTEM_ERROR);
+}
\ No newline at end of file
diff --git a/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentCountersIdentifier.java b/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentCountersIdentifier.java
new file mode 100644
index 0000000..cca163f
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentCountersIdentifier.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.pppoeagent.impl;
+
+import org.opencord.pppoeagent.PppoeAgentEvent;
+
+import java.util.Objects;
+
+/**
+ * Represents PPPoED agent counters identifier.
+ */
+public final class PppoeAgentCountersIdentifier {
+ final String counterClassKey;
+ final Enum<PppoeAgentCounterNames> counterTypeKey;
+
+ /**
+ * Creates a default global counter identifier for a given counterType.
+ *
+ * @param counterTypeKey Identifies the supported type of pppoe agent counters
+ */
+ public PppoeAgentCountersIdentifier(PppoeAgentCounterNames counterTypeKey) {
+ this.counterClassKey = PppoeAgentEvent.GLOBAL_COUNTER;
+ this.counterTypeKey = counterTypeKey;
+ }
+
+ /**
+ * Creates a counter identifier. A counter is defined by the key pair [counterClass, counterType],
+ * where counterClass can be global or the subscriber ID and counterType is the supported pppoe counter.
+ *
+ * @param counterClassKey Identifies which class the counter is assigned (global or per subscriber)
+ * @param counterTypeKey Identifies the supported type of pppoed relay counters
+ */
+ public PppoeAgentCountersIdentifier(String counterClassKey, PppoeAgentCounterNames counterTypeKey) {
+ this.counterClassKey = counterClassKey;
+ this.counterTypeKey = counterTypeKey;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj instanceof PppoeAgentCountersIdentifier) {
+ final PppoeAgentCountersIdentifier other = (PppoeAgentCountersIdentifier) obj;
+ return Objects.equals(this.counterClassKey, other.counterClassKey)
+ && Objects.equals(this.counterTypeKey, other.counterTypeKey);
+ }
+
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(counterClassKey, counterTypeKey);
+ }
+
+ @Override
+ public String toString() {
+ return "PppoeAgentCountersIdentifier{" +
+ "counterClassKey='" + counterClassKey + '\'' +
+ ", counterTypeKey=" + counterTypeKey +
+ '}';
+ }
+}
\ No newline at end of file
diff --git a/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentCountersStore.java b/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentCountersStore.java
new file mode 100644
index 0000000..5b4ddfd
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentCountersStore.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.pppoeagent.impl;
+
+import org.onosproject.store.Store;
+import org.opencord.pppoeagent.PppoeAgentEvent;
+import org.opencord.pppoeagent.PppoeAgentStoreDelegate;
+
+/**
+ * Represents a stored Pppoe Agent Counters. A counter entry is defined by the pair [counterClass, counterType],
+ * where counterClass can be maybe global or subscriber ID and counterType is the pppoe counter.
+ */
+public interface PppoeAgentCountersStore extends Store<PppoeAgentEvent, PppoeAgentStoreDelegate> {
+
+ String NAME = "PPPOE_Agent_stats";
+
+ /**
+ * Creates or updates PPPOE Agent counter.
+ *
+ * @param counterClass class of counters (global, per subscriber).
+ * @param counterType name of counter
+ */
+ void incrementCounter(String counterClass, PppoeAgentCounterNames counterType);
+
+ /**
+ * Sets the value of a PPPOE Agent counter.
+ *
+ * @param counterClass class of counters (global, per subscriber).
+ * @param counterType name of counter
+ * @param value The value of the counter
+ */
+ void setCounter(String counterClass, PppoeAgentCounterNames counterType, Long value);
+
+ /**
+ * Gets the current PPPoE Agent counter values.
+ *
+ * @return PPPoE Agent counter values
+ */
+ PppoeAgentStatistics getCounters();
+
+ /**
+ * Resets counter values for a given counter class.
+ *
+ * @param counterClass class of counters (global, per subscriber).
+ */
+ void resetCounters(String counterClass);
+}
\ No newline at end of file
diff --git a/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentStatistics.java b/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentStatistics.java
new file mode 100644
index 0000000..dce1ac7
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentStatistics.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.pppoeagent.impl;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Snapshot of PPPoE Agent statistics.
+ */
+public class PppoeAgentStatistics {
+ private final ImmutableMap<PppoeAgentCountersIdentifier, Long> counters;
+ private PppoeAgentStatistics(ImmutableMap<PppoeAgentCountersIdentifier, Long> counters) {
+ this.counters = counters;
+ }
+ /**
+ * Creates a new empty statistics instance.
+ */
+ public PppoeAgentStatistics() {
+ counters = ImmutableMap.of();
+ }
+ /**
+ * Gets the value of the counter with the given ID. Defaults to 0 if counter is not present.
+ *
+ * @param id counter ID
+ * @return counter value
+ */
+ public long get(PppoeAgentCountersIdentifier id) {
+ return counters.getOrDefault(id, 0L);
+ }
+ /**
+ * Gets the map of counters.
+ *
+ * @return map of counters
+ */
+ public Map<PppoeAgentCountersIdentifier, Long> counters() {
+ return counters;
+ }
+ /**
+ * Creates a new statistics instance with the given counter values.
+ *
+ * @param counters counters
+ * @return statistics
+ */
+ public static PppoeAgentStatistics withCounters(Map<PppoeAgentCountersIdentifier, Long> counters) {
+ ImmutableMap.Builder<PppoeAgentCountersIdentifier, Long> builder = ImmutableMap.builder();
+ counters.forEach(builder::put);
+ return new PppoeAgentStatistics(builder.build());
+ }
+ /**
+ * Adds the given statistics instance to this one (sums the common counters) and returns
+ * a new instance containing the result.
+ *
+ * @param other other instance
+ * @return result
+ */
+ public PppoeAgentStatistics add(PppoeAgentStatistics other) {
+ ImmutableMap.Builder<PppoeAgentCountersIdentifier, Long> builder = ImmutableMap.builder();
+ Set<PppoeAgentCountersIdentifier> keys = Sets.newHashSet(other.counters.keySet());
+ counters.forEach((id, value) -> {
+ builder.put(id, value + other.counters.getOrDefault(id, 0L));
+ keys.remove(id);
+ });
+ keys.forEach(i -> builder.put(i, other.counters.get(i)));
+ return new PppoeAgentStatistics(builder.build());
+ }
+ @Override
+ public String toString() {
+ MoreObjects.ToStringHelper helper = MoreObjects.toStringHelper(this.getClass());
+ counters.forEach((id, v) -> helper.add(id.toString(), v));
+ return helper.toString();
+ }
+}
\ No newline at end of file
diff --git a/app/src/main/java/org/opencord/pppoeagent/impl/SimplePppoeAgentCountersStore.java b/app/src/main/java/org/opencord/pppoeagent/impl/SimplePppoeAgentCountersStore.java
new file mode 100644
index 0000000..8be88b7
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/impl/SimplePppoeAgentCountersStore.java
@@ -0,0 +1,305 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.pppoeagent.impl;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableMap;
+
+import org.onlab.util.KryoNamespace;
+import org.onlab.util.SafeRecurringTask;
+import org.onlab.util.Tools;
+import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.store.AbstractStore;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterMessage;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.WallClockTimestamp;
+import org.opencord.pppoeagent.PppoeAgentEvent;
+import org.opencord.pppoeagent.PppoeAgentStoreDelegate;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Modified;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+
+import org.slf4j.Logger;
+import java.nio.charset.StandardCharsets;
+
+import java.util.Dictionary;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.opencord.pppoeagent.impl.OsgiPropertyConstants.PUBLISH_COUNTERS_RATE;
+import static org.opencord.pppoeagent.impl.OsgiPropertyConstants.PUBLISH_COUNTERS_RATE_DEFAULT;
+import static org.opencord.pppoeagent.impl.OsgiPropertyConstants.SYNC_COUNTERS_RATE;
+import static org.opencord.pppoeagent.impl.OsgiPropertyConstants.SYNC_COUNTERS_RATE_DEFAULT;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * PPPoE Agent Counters Manager Component.
+ */
+@Component(immediate = true,
+ property = {
+ PUBLISH_COUNTERS_RATE + ":Integer=" + PUBLISH_COUNTERS_RATE_DEFAULT,
+ SYNC_COUNTERS_RATE + ":Integer=" + SYNC_COUNTERS_RATE_DEFAULT,
+ }
+)
+public class SimplePppoeAgentCountersStore extends AbstractStore<PppoeAgentEvent, PppoeAgentStoreDelegate>
+ implements PppoeAgentCountersStore {
+ private static final String PPPOE_STATISTICS_LEADERSHIP = "pppoeagent-statistics";
+ private static final MessageSubject RESET_SUBJECT = new MessageSubject("pppoeagent-statistics-reset");
+
+ private final Logger log = getLogger(getClass());
+ private ConcurrentMap<PppoeAgentCountersIdentifier, Long> countersMap;
+
+ private EventuallyConsistentMap<NodeId, PppoeAgentStatistics> statistics;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected StorageService storageService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected ClusterService clusterService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected LeadershipService leadershipService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected ComponentConfigService componentConfigService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected ClusterCommunicationService clusterCommunicationService;
+ protected int publishCountersRate = PUBLISH_COUNTERS_RATE_DEFAULT;
+ protected int syncCountersRate = SYNC_COUNTERS_RATE_DEFAULT;
+ KryoNamespace serializer = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .register(PppoeAgentStatistics.class)
+ .register(PppoeAgentCountersIdentifier.class)
+ .register(PppoeAgentCounterNames.class)
+ .register(ClusterMessage.class)
+ .register(MessageSubject.class)
+ .build();
+ private ScheduledExecutorService executor;
+ private ScheduledFuture<?> publisherTask;
+ private ScheduledFuture<?> syncTask;
+ private AtomicBoolean dirty = new AtomicBoolean(true);
+
+ @Activate
+ public void activate(ComponentContext context) {
+ log.info("Activate PPPoE Agent Counters Manager");
+ countersMap = new ConcurrentHashMap<>();
+ componentConfigService.registerProperties(getClass());
+ modified(context);
+ statistics = storageService.<NodeId, PppoeAgentStatistics>eventuallyConsistentMapBuilder()
+ .withName("pppoeagent-statistics")
+ .withSerializer(serializer)
+ .withTimestampProvider((k, v) -> new WallClockTimestamp())
+ .build();
+ // Initialize counter values for the global counters
+ initCounters(PppoeAgentEvent.GLOBAL_COUNTER, statistics.get(clusterService.getLocalNode().id()));
+ syncStats();
+ leadershipService.runForLeadership(PPPOE_STATISTICS_LEADERSHIP);
+ executor = Executors.newScheduledThreadPool(1);
+ clusterCommunicationService.addSubscriber(RESET_SUBJECT, Serializer.using(serializer)::decode,
+ this::resetLocal, executor);
+ startSyncTask();
+ startPublishTask();
+ }
+
+ @Deactivate
+ public void deactivate() {
+ clusterCommunicationService.removeSubscriber(RESET_SUBJECT);
+ leadershipService.withdraw(PPPOE_STATISTICS_LEADERSHIP);
+ stopPublishTask();
+ stopSyncTask();
+ executor.shutdownNow();
+ componentConfigService.unregisterProperties(getClass(), false);
+ }
+ @Modified
+ public void modified(ComponentContext context) {
+ Dictionary<String, Object> properties = context.getProperties();
+ String s = Tools.get(properties, PUBLISH_COUNTERS_RATE);
+ int oldPublishCountersRate = publishCountersRate;
+ publishCountersRate = Strings.isNullOrEmpty(s) ? PUBLISH_COUNTERS_RATE_DEFAULT
+ : Integer.parseInt(s.trim());
+ if (oldPublishCountersRate != publishCountersRate) {
+ stopPublishTask();
+ startPublishTask();
+ }
+ s = Tools.get(properties, SYNC_COUNTERS_RATE);
+ int oldSyncCountersRate = syncCountersRate;
+ syncCountersRate = Strings.isNullOrEmpty(s) ? SYNC_COUNTERS_RATE_DEFAULT
+ : Integer.parseInt(s.trim());
+ if (oldSyncCountersRate != syncCountersRate) {
+ stopSyncTask();
+ startSyncTask();
+ }
+ }
+ private ScheduledFuture<?> startTask(Runnable r, int rate) {
+ return executor.scheduleAtFixedRate(SafeRecurringTask.wrap(r),
+ 0, rate, TimeUnit.SECONDS);
+ }
+ private void stopTask(ScheduledFuture<?> task) {
+ task.cancel(true);
+ }
+ private void startSyncTask() {
+ syncTask = startTask(this::syncStats, syncCountersRate);
+ }
+ private void stopSyncTask() {
+ stopTask(syncTask);
+ }
+ private void startPublishTask() {
+ publisherTask = startTask(this::publishStats, publishCountersRate);
+ }
+ private void stopPublishTask() {
+ stopTask(publisherTask);
+ }
+
+ ImmutableMap<PppoeAgentCountersIdentifier, Long> getCountersMap() {
+ return ImmutableMap.copyOf(countersMap);
+ }
+
+ public PppoeAgentStatistics getCounters() {
+ return aggregate();
+ }
+
+ /**
+ * Initialize the supported counters map for the given counter class.
+ * @param counterClass class of counters (global, per subscriber)
+ * @param existingStats existing values to intialise the counters to
+ */
+ public void initCounters(String counterClass, PppoeAgentStatistics existingStats) {
+ checkNotNull(counterClass, "counter class can't be null");
+ for (PppoeAgentCounterNames counterType : PppoeAgentCounterNames.SUPPORTED_COUNTERS) {
+ PppoeAgentCountersIdentifier id = new PppoeAgentCountersIdentifier(counterClass, counterType);
+ countersMap.put(id, existingStats == null ? 0L : existingStats.get(id));
+ }
+ }
+
+ /**
+ * Inserts the counter entry if it is not already in the set otherwise increment the existing counter entry.
+ * @param counterClass class of counters (global, per subscriber)
+ * @param counterType name of counter
+ */
+ public void incrementCounter(String counterClass, PppoeAgentCounterNames counterType) {
+ checkNotNull(counterClass, "counter class can't be null");
+ if (PppoeAgentCounterNames.SUPPORTED_COUNTERS.contains(counterType)) {
+ PppoeAgentCountersIdentifier counterIdentifier =
+ new PppoeAgentCountersIdentifier(counterClass, counterType);
+ countersMap.compute(counterIdentifier, (key, counterValue) ->
+ (counterValue != null) ? counterValue + 1 : 1L);
+ } else {
+ log.error("Failed to increment counter class {} of type {}", counterClass, counterType);
+ }
+ dirty.set(true);
+ }
+
+ /**
+ * Reset the counters map for the given counter class.
+ * @param counterClass class of counters (global, per subscriber)
+ */
+ public void resetCounters(String counterClass) {
+ byte[] payload = counterClass.getBytes(StandardCharsets.UTF_8);
+ ClusterMessage reset = new ClusterMessage(clusterService.getLocalNode().id(), RESET_SUBJECT, payload);
+ clusterCommunicationService.broadcastIncludeSelf(reset, RESET_SUBJECT, Serializer.using(serializer)::encode);
+ }
+ private void resetLocal(ClusterMessage m) {
+ String counterClass = new String(m.payload(), StandardCharsets.UTF_8);
+ checkNotNull(counterClass, "counter class can't be null");
+ for (PppoeAgentCounterNames counterType : PppoeAgentCounterNames.SUPPORTED_COUNTERS) {
+ PppoeAgentCountersIdentifier counterIdentifier =
+ new PppoeAgentCountersIdentifier(counterClass, counterType);
+ countersMap.computeIfPresent(counterIdentifier, (key, counterValue) -> 0L);
+ }
+ dirty.set(true);
+ syncStats();
+ }
+
+ /**
+ * Inserts the counter entry if it is not already in the set otherwise update the existing counter entry.
+ * @param counterClass class of counters (global, per subscriber).
+ * @param counterType name of counter
+ * @param value counter value
+ */
+ public void setCounter(String counterClass, PppoeAgentCounterNames counterType, Long value) {
+ checkNotNull(counterClass, "counter class can't be null");
+ if (PppoeAgentCounterNames.SUPPORTED_COUNTERS.contains(counterType)) {
+ PppoeAgentCountersIdentifier counterIdentifier =
+ new PppoeAgentCountersIdentifier(counterClass, counterType);
+ countersMap.put(counterIdentifier, value);
+ } else {
+ log.error("Failed to increment counter class {} of type {}", counterClass, counterType);
+ }
+ dirty.set(true);
+ syncStats();
+ }
+
+ private PppoeAgentStatistics aggregate() {
+ return statistics.values().stream()
+ .reduce(new PppoeAgentStatistics(), PppoeAgentStatistics::add);
+ }
+ /**
+ * Creates a snapshot of the current in-memory statistics.
+ *
+ * @return snapshot of statistics
+ */
+ private PppoeAgentStatistics snapshot() {
+ return PppoeAgentStatistics.withCounters(countersMap);
+ }
+ /**
+ * Syncs in-memory stats to the eventually consistent map.
+ */
+ private void syncStats() {
+ if (dirty.get()) {
+ statistics.put(clusterService.getLocalNode().id(), snapshot());
+ dirty.set(false);
+ }
+ }
+ private void publishStats() {
+ // Only publish events if we are the cluster leader for PPPoE Agent stats
+ if (!Objects.equals(leadershipService.getLeader(PPPOE_STATISTICS_LEADERSHIP),
+ clusterService.getLocalNode().id())) {
+ return;
+ }
+ if (aggregate().counters() != null) {
+ aggregate().counters().forEach((counterKey, counterValue) -> {
+ // Subscriber-specific counters have the subscriber ID set
+ String subscriberId = null;
+ if (!counterKey.counterClassKey.equals(PppoeAgentEvent.GLOBAL_COUNTER)) {
+ subscriberId = counterKey.counterClassKey;
+ }
+ if (delegate != null) {
+ delegate.notify(new PppoeAgentEvent(PppoeAgentEvent.Type.STATS_UPDATE, null,
+ counterKey.counterTypeKey.toString(), counterValue,
+ null, subscriberId));
+ }
+ });
+ } else {
+ log.debug("Ignoring 'publishStats' request since counters are null.");
+ }
+ }
+}
\ No newline at end of file
diff --git a/app/src/main/java/org/opencord/pppoeagent/impl/package-info.java b/app/src/main/java/org/opencord/pppoeagent/impl/package-info.java
new file mode 100644
index 0000000..84ceb21
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/impl/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * PPPoE agent implementation.
+ */
+package org.opencord.pppoeagent.impl;
diff --git a/app/src/main/java/org/opencord/pppoeagent/rest/PppoeAgentWebResource.java b/app/src/main/java/org/opencord/pppoeagent/rest/PppoeAgentWebResource.java
new file mode 100644
index 0000000..56d5598
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/rest/PppoeAgentWebResource.java
@@ -0,0 +1,186 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.pppoeagent.rest;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import java.util.Map;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import org.onlab.packet.MacAddress;
+import org.onlab.packet.PPPoED;
+import org.onlab.util.ItemNotFoundException;
+import org.onosproject.cli.AbstractShellCommand;
+import org.onosproject.net.AnnotationKeys;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.Port;
+import org.onosproject.rest.AbstractWebResource;
+
+import org.opencord.pppoeagent.impl.PppoeAgentCounterNames;
+import org.opencord.pppoeagent.impl.PppoeAgentCountersStore;
+import org.opencord.pppoeagent.impl.PppoeAgentCountersIdentifier;
+import org.opencord.pppoeagent.impl.PppoeAgentStatistics;
+import org.opencord.pppoeagent.PppoeAgentEvent;
+import org.opencord.pppoeagent.PppoeAgentService;
+import org.opencord.pppoeagent.PppoeSessionInfo;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR;
+
+/**
+ * PppoeAgent web resource.
+ */
+@Path("pppoeagent-app")
+public class PppoeAgentWebResource extends AbstractWebResource {
+ private final ObjectNode root = mapper().createObjectNode();
+ private final ArrayNode node = root.putArray("entry");
+ private static final String SESSION_NOT_FOUND = "Session not found";
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ DeviceService deviceService = AbstractShellCommand.get(DeviceService.class);
+
+ /**
+ * Get session info object.
+ *
+ * @param mac Session MAC address
+ *
+ * @return 200 OK
+ */
+ @GET
+ @Path("/session/{mac}")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getSubscriber(@PathParam("mac") String mac) {
+ MacAddress macAddress = MacAddress.valueOf(mac);
+ PppoeAgentService pppoeAgent = get(PppoeAgentService.class);
+ PppoeSessionInfo entry = pppoeAgent.getSessionsMap().get(macAddress);
+ if (entry == null) {
+ throw new ItemNotFoundException(SESSION_NOT_FOUND);
+ }
+
+ try {
+ node.add(encodePppoeSessionInfo(entry, macAddress));
+ return ok(mapper().writeValueAsString(root)).build();
+ } catch (IllegalArgumentException e) {
+ log.error("Error while fetching PPPoE session info for MAC {} through REST API: {}", mac, e.getMessage());
+ return Response.status(INTERNAL_SERVER_ERROR).build();
+ } catch (JsonProcessingException e) {
+ log.error("Error assembling JSON response for PPPoE session info request for MAC {} " +
+ "through REST API: {}", mac, e.getMessage());
+ return Response.status(INTERNAL_SERVER_ERROR).build();
+ }
+ }
+
+ /**
+ * Get all session info objects.
+ *
+ * @return 200 OK
+ */
+ @GET
+ @Path("/session")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getSubscribers() {
+ try {
+ PppoeAgentService pppoeAgent = get(PppoeAgentService.class);
+ pppoeAgent.getSessionsMap().forEach((mac, entry) -> {
+ node.add(encodePppoeSessionInfo(entry, mac));
+ });
+
+ return ok(mapper().writeValueAsString(root)).build();
+ } catch (Exception e) {
+ log.error("Error while fetching PPPoE sessions information through REST API: {}", e.getMessage());
+ return Response.status(INTERNAL_SERVER_ERROR).build();
+ }
+ }
+
+ private ObjectNode encodePppoeSessionInfo(PppoeSessionInfo entry, MacAddress macAddress) {
+ ConnectPoint cp = entry.getClientCp();
+ Port devicePort = deviceService.getPort(cp);
+ String portLabel = "uni-" + ((cp.port().toLong() & 0xF) + 1);
+ String subscriberId = devicePort != null ? devicePort.annotations().value(AnnotationKeys.PORT_NAME) :
+ "UNKNOWN";
+
+ return mapper().createObjectNode()
+ .put("macAddress", macAddress.toString())
+ .put("sessionId", entry.getSessionId())
+ .put("currentState", entry.getCurrentState())
+ .put("lastReceivedPacket", PPPoED.Type.getTypeByValue(entry.getPacketCode()).name())
+ .put("deviceId", cp.deviceId().toString())
+ .put("portNumber", cp.port().toString())
+ .put("portLabel", portLabel)
+ .put("subscriberId", subscriberId);
+ }
+
+ /**
+ * Gets PPPoE Agent counters for global context.
+ *
+ * @return 200 OK
+ */
+ @GET
+ @Path("/stats")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getPppoeStats() {
+ return getStats(PppoeAgentEvent.GLOBAL_COUNTER);
+ }
+
+ /**
+ * Gets PPPoE Agent counters for specific subscriber.
+ *
+ * @param subscriberId Id of subscriber.
+ *
+ * @return 200 OK
+ */
+ @GET
+ @Path("/stats/{subscriberId}")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getPppoeSubscriberStats(@PathParam("subscriberId") String subscriberId) {
+ return getStats(subscriberId);
+ }
+ private Response getStats(String key) {
+ PppoeAgentCountersStore pppoeCounters = get(PppoeAgentCountersStore.class);
+ try {
+ PppoeAgentStatistics pppoeStatistics = pppoeCounters.getCounters();
+ JsonNode completeNode = buildPppoeCounterNodeObject(key, pppoeStatistics.counters());
+ return ok(mapper().writeValueAsString(completeNode)).build();
+ } catch (JsonProcessingException e) {
+ log.error("Error while fetching PPPoE agent counter stats through REST API: {}", e.getMessage());
+ return Response.status(INTERNAL_SERVER_ERROR).build();
+ }
+ }
+
+ private JsonNode buildPppoeCounterNodeObject(String key, Map<PppoeAgentCountersIdentifier, Long> countersMap) {
+ ObjectNode entryNode = mapper().createObjectNode();
+ for (PppoeAgentCounterNames counterType : PppoeAgentCounterNames.SUPPORTED_COUNTERS) {
+ Long value = countersMap.get(new PppoeAgentCountersIdentifier(key, counterType));
+ if (value == null) {
+ continue;
+ }
+ entryNode = entryNode.put(counterType.name(), String.valueOf(value));
+ }
+ return mapper().createObjectNode().set(key, entryNode);
+ }
+}
\ No newline at end of file
diff --git a/app/src/main/java/org/opencord/pppoeagent/rest/package-info.java b/app/src/main/java/org/opencord/pppoeagent/rest/package-info.java
new file mode 100644
index 0000000..70a18d3
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/rest/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Rest interface for PppoeAgent.
+ */
+package org.opencord.pppoeagent.rest;
diff --git a/app/src/main/webapp/WEB-INF/web.xml b/app/src/main/webapp/WEB-INF/web.xml
new file mode 100644
index 0000000..9bb27a0
--- /dev/null
+++ b/app/src/main/webapp/WEB-INF/web.xml
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Copyright 2021-present Open Networking Foundation
+ ~
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://java.sun.com/xml/ns/javaee"
+ xmlns:web="http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"
+ xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"
+ id="ONOS" version="2.5">
+ <display-name>PPPoE Agent REST API v1.0</display-name>
+
+ <servlet>
+ <servlet-name>JAX-RS Service</servlet-name>
+ <servlet-class>org.glassfish.jersey.servlet.ServletContainer</servlet-class>
+ <init-param>
+ <param-name>jersey.config.server.provider.classnames</param-name>
+ <param-value>
+ org.opencord.pppoeagent.rest.PppoeAgentWebResource
+ </param-value>
+ </init-param>
+ <load-on-startup>1</load-on-startup>
+ </servlet>
+
+ <servlet-mapping>
+ <servlet-name>JAX-RS Service</servlet-name>
+ <url-pattern>/*</url-pattern>
+ </servlet-mapping>
+</web-app>