[VOL-3661] PPPoE IA application - initial commit

Change-Id: Idaf23f8736cba955fe8a3049b8fc9c85b3cd3ab9
Signed-off-by: Gustavo Silva <gsilva@furukawalatam.com>
diff --git a/app/src/main/java/org/opencord/pppoeagent/cli/PppoeAgentShowUsersCommand.java b/app/src/main/java/org/opencord/pppoeagent/cli/PppoeAgentShowUsersCommand.java
new file mode 100644
index 0000000..a67ffca
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/cli/PppoeAgentShowUsersCommand.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.pppoeagent.cli;
+
+import org.opencord.pppoeagent.PppoeAgentService;
+import org.opencord.pppoeagent.PppoeSessionInfo;
+import org.apache.karaf.shell.api.action.Argument;
+import org.apache.karaf.shell.api.action.Command;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.onlab.packet.MacAddress;
+import org.onlab.packet.PPPoED;
+import org.onosproject.cli.AbstractShellCommand;
+import org.onosproject.net.AnnotationKeys;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.Port;
+import org.onosproject.net.device.DeviceService;
+
+/**
+ *  Shows the PPPoE sessions/users learned by the agent.
+ */
+@Service
+@Command(scope = "pppoe", name = "pppoe-users",
+        description = "Shows the PPPoE users")
+public class PppoeAgentShowUsersCommand extends AbstractShellCommand {
+
+    @Argument(index = 0, name = "mac", description = "MAC address related to PPPoE session.",
+            required = false, multiValued = false)
+    private String macStr = null;
+
+    @Override
+    protected void doExecute() {
+        MacAddress macAddress = null;
+        if (macStr != null && !macStr.isEmpty()) {
+            try {
+                macAddress = MacAddress.valueOf(macStr);
+            } catch (IllegalArgumentException e) {
+                log.error(e.getMessage());
+                return;
+            }
+        }
+
+        DeviceService deviceService = AbstractShellCommand.get(DeviceService.class);
+        PppoeAgentService pppoeAgentService = AbstractShellCommand.get(PppoeAgentService.class);
+
+        if (macAddress != null) {
+            PppoeSessionInfo singleInfo = pppoeAgentService.getSessionsMap().get(macAddress);
+            if (singleInfo != null) {
+                Port devicePort = deviceService.getPort(singleInfo.getClientCp());
+                printPppoeInfo(macAddress, singleInfo, devicePort);
+            } else {
+                print("No session information found for provided MAC address %s", macAddress.toString());
+            }
+        } else {
+            pppoeAgentService.getSessionsMap().forEach((mac, sessionInfo) -> {
+                final Port devicePortFinal = deviceService.getPort(sessionInfo.getClientCp());
+                printPppoeInfo(mac, sessionInfo, devicePortFinal);
+            });
+        }
+    }
+
+    private void printPppoeInfo(MacAddress macAddr, PppoeSessionInfo sessionInfo, Port devicePort) {
+        PPPoED.Type lastReceivedPkt = PPPoED.Type.getTypeByValue(sessionInfo.getPacketCode());
+        ConnectPoint cp = sessionInfo.getClientCp();
+        String subscriberId = devicePort != null ? devicePort.annotations().value(AnnotationKeys.PORT_NAME) :
+                "UNKNOWN";
+
+        print("MacAddress=%s,SessionId=%s,CurrentState=%s,LastReceivedPacket=%s,DeviceId=%s,PortNumber=%s," +
+                        "SubscriberId=%s",
+                macAddr.toString(), String.valueOf(sessionInfo.getSessionId()),
+                sessionInfo.getCurrentState(), lastReceivedPkt.name(),
+                cp.deviceId().toString(), cp.port().toString(), subscriberId);
+    }
+}
diff --git a/app/src/main/java/org/opencord/pppoeagent/cli/PppoeAgentStatsCommand.java b/app/src/main/java/org/opencord/pppoeagent/cli/PppoeAgentStatsCommand.java
new file mode 100644
index 0000000..885bf36
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/cli/PppoeAgentStatsCommand.java
@@ -0,0 +1,137 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.pppoeagent.cli;
+
+import org.opencord.pppoeagent.PppoeAgentEvent;
+import org.apache.karaf.shell.api.action.Argument;
+import org.apache.karaf.shell.api.action.Command;
+import org.apache.karaf.shell.api.action.Option;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.onosproject.cli.AbstractShellCommand;
+import java.util.Collections;
+import java.util.Map;
+import org.opencord.pppoeagent.impl.PppoeAgentCountersIdentifier;
+import org.opencord.pppoeagent.impl.PppoeAgentCountersStore;
+import org.opencord.pppoeagent.impl.PppoeAgentCounterNames;
+
+/**
+ * Display/Reset the PPPoE Agent application statistics.
+ */
+@Service
+@Command(scope = "pppoe", name = "pppoeagent-stats",
+        description = "Display or Reset the PPPoE Agent application statistics.")
+public class PppoeAgentStatsCommand extends AbstractShellCommand {
+    private static final String CONFIRM_PHRASE = "please";
+
+    @Option(name = "-r", aliases = "--reset", description = "Resets a specific counter.\n",
+            required = false, multiValued = false)
+    PppoeAgentCounterNames reset = null;
+
+    @Option(name = "-R", aliases = "--reset-all", description = "Resets all counters.\n",
+            required = false, multiValued = false)
+    boolean resetAll = false;
+
+    @Option(name = "-s", aliases = "--subscriberId", description = "Subscriber Id.\n",
+            required = false, multiValued = false)
+    String subscriberId = null;
+
+    @Option(name = "-p", aliases = "--please", description = "Confirmation phrase.",
+            required = false, multiValued = false)
+    String please = null;
+
+    @Argument(index = 0, name = "counter",
+            description = "The counter to display. In case not specified, all counters will be displayed.",
+            required = false, multiValued = false)
+    PppoeAgentCounterNames counter = null;
+
+    @Override
+    protected void doExecute() {
+        PppoeAgentCountersStore pppoeCounters = AbstractShellCommand.get(PppoeAgentCountersStore.class);
+
+        if ((subscriberId == null) || (subscriberId.equals("global"))) {
+            // All subscriber Ids
+            subscriberId = PppoeAgentEvent.GLOBAL_COUNTER;
+        }
+
+        if (resetAll || reset != null) {
+            if (please == null || !please.equals(CONFIRM_PHRASE)) {
+                print("WARNING: Be aware that you are going to reset the counters. " +
+                        "Enter confirmation phrase to continue.");
+                return;
+            }
+            if (resetAll) {
+                // Reset all counters.
+                pppoeCounters.resetCounters(subscriberId);
+            } else {
+                // Reset the specified counter.
+                pppoeCounters.setCounter(subscriberId, reset, (long) 0);
+            }
+        } else {
+            Map<PppoeAgentCountersIdentifier, Long> countersMap = pppoeCounters.getCounters().counters();
+            if (countersMap.size() > 0) {
+                if (counter == null) {
+                    String jsonString = "";
+                    if (outputJson()) {
+                        jsonString = String.format("{\"%s\":{", pppoeCounters.NAME);
+                    } else {
+                        print("%s [%s] :", pppoeCounters.NAME, subscriberId);
+                    }
+                    PppoeAgentCounterNames[] counters = PppoeAgentCounterNames.values();
+                    for (int i = 0; i < counters.length; i++) {
+                        PppoeAgentCounterNames counterType = counters[i];
+                        Long value = countersMap.get(new PppoeAgentCountersIdentifier(subscriberId, counterType));
+                        if (value == null) {
+                            value = 0L;
+                        }
+                        if (outputJson()) {
+                            jsonString += String.format("\"%s\":%d", counterType, value);
+                            if (i < counters.length - 1) {
+                                jsonString += ",";
+                            }
+                        } else {
+                            printCounter(counterType, value);
+                        }
+                    }
+                    if (outputJson()) {
+                        jsonString += "}}";
+                        print("%s", jsonString);
+                    }
+                } else {
+                    // Show only the specified counter
+                    Long value = countersMap.get(new PppoeAgentCountersIdentifier(subscriberId, counter));
+                    if (value == null) {
+                        value = 0L;
+                    }
+                    if (outputJson()) {
+                        print("{\"%s\":%d}", counter, value);
+                    } else {
+                        printCounter(counter, value);
+                    }
+                }
+            } else {
+                print("No PPPoE Agent Counters were created yet for counter class [%s]",
+                        PppoeAgentEvent.GLOBAL_COUNTER);
+            }
+        }
+    }
+
+    void printCounter(PppoeAgentCounterNames c, long value) {
+        // print in non-JSON format
+        print("  %s %s %-4d", c,
+                String.join("", Collections.nCopies(50 - c.toString().length(), ".")), value);
+    }
+}
\ No newline at end of file
diff --git a/app/src/main/java/org/opencord/pppoeagent/cli/package-info.java b/app/src/main/java/org/opencord/pppoeagent/cli/package-info.java
new file mode 100644
index 0000000..515908f
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/cli/package-info.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * CLI commands for the PPPoE agent.
+ */
+package org.opencord.pppoeagent.cli;
diff --git a/app/src/main/java/org/opencord/pppoeagent/impl/OsgiPropertyConstants.java b/app/src/main/java/org/opencord/pppoeagent/impl/OsgiPropertyConstants.java
new file mode 100644
index 0000000..65c8310
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/impl/OsgiPropertyConstants.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.pppoeagent.impl;
+
+/**
+ * Constants for default values of configurable properties.
+ */
+public final class OsgiPropertyConstants {
+
+    private OsgiPropertyConstants() {
+    }
+
+    public static final String ENABLE_CIRCUIT_ID_VALIDATION = "enableCircuitIdValidation";
+    public static final boolean ENABLE_CIRCUIT_ID_VALIDATION_DEFAULT = true;
+
+    public static final String PPPOE_COUNTERS_TOPIC = "pppoeCountersTopic";
+    public static final String PPPOE_COUNTERS_TOPIC_DEFAULT = "onos.pppoe.stats.kpis";
+
+    public static final String PUBLISH_COUNTERS_RATE = "publishCountersRate";
+    public static final int PUBLISH_COUNTERS_RATE_DEFAULT = 10;
+
+    public static final String PPPOE_MAX_MTU = "pppoeMaxMtu";
+    public static final int PPPOE_MAX_MTU_DEFAULT = 1500;
+
+    public static final String PACKET_PROCESSOR_THREADS = "packetProcessorThreads";
+    public static final int PACKET_PROCESSOR_THREADS_DEFAULT = 10;
+
+    public static final String SYNC_COUNTERS_RATE = "syncCountersRate";
+    public static final int SYNC_COUNTERS_RATE_DEFAULT = 5;
+}
diff --git a/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgent.java b/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgent.java
new file mode 100644
index 0000000..67c71da
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgent.java
@@ -0,0 +1,929 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.pppoeagent.impl;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableSet;
+
+import com.google.common.collect.Sets;
+import org.opencord.pppoeagent.PppoeAgentEvent;
+import org.opencord.pppoeagent.PppoeAgentListener;
+import org.opencord.pppoeagent.PppoeAgentService;
+import org.opencord.pppoeagent.PppoeAgentStoreDelegate;
+import org.opencord.pppoeagent.PPPoEDVendorSpecificTag;
+import org.opencord.pppoeagent.PppoeSessionInfo;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Serializer;
+import com.google.common.collect.ImmutableMap;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMap;
+
+import org.onlab.util.KryoNamespace;
+import org.onlab.packet.Ethernet;
+import org.onlab.packet.MacAddress;
+import org.onlab.packet.PPPoED;
+import org.onlab.packet.PPPoEDTag;
+import org.onlab.util.Tools;
+import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.event.AbstractListenerManager;
+import org.onosproject.mastership.MastershipEvent;
+import org.onosproject.mastership.MastershipListener;
+import org.onosproject.mastership.MastershipService;
+import org.onosproject.net.AnnotationKeys;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.Device;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.Port;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.config.ConfigFactory;
+import org.onosproject.net.config.NetworkConfigEvent;
+import org.onosproject.net.config.NetworkConfigListener;
+import org.onosproject.net.config.NetworkConfigRegistry;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.device.DeviceListener;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.TrafficTreatment;
+import org.onosproject.net.packet.DefaultOutboundPacket;
+import org.onosproject.net.packet.OutboundPacket;
+import org.onosproject.net.packet.PacketContext;
+import org.onosproject.net.packet.PacketProcessor;
+import org.onosproject.net.packet.PacketService;
+import org.opencord.sadis.BaseInformationService;
+import org.opencord.sadis.SadisService;
+import org.opencord.sadis.SubscriberAndDeviceInformation;
+import org.opencord.sadis.UniTagInformation;
+import org.opencord.pppoeagent.util.CircuitIdBuilder;
+import org.opencord.pppoeagent.util.CircuitIdFieldName;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Modified;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Dictionary;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+import org.onosproject.store.service.Versioned;
+
+import static java.util.concurrent.Executors.newFixedThreadPool;
+
+import static org.onlab.packet.PPPoED.PPPOED_CODE_PADI;
+import static org.onlab.packet.PPPoED.PPPOED_CODE_PADO;
+import static org.onlab.packet.PPPoED.PPPOED_CODE_PADR;
+import static org.onlab.packet.PPPoED.PPPOED_CODE_PADS;
+import static org.onlab.packet.PPPoED.PPPOED_CODE_PADT;
+import static org.onlab.packet.PPPoEDTag.PPPOED_TAG_AC_SYSTEM_ERROR;
+import static org.onlab.packet.PPPoEDTag.PPPOED_TAG_GENERIC_ERROR;
+import static org.onlab.packet.PPPoEDTag.PPPOED_TAG_SERVICE_NAME_ERROR;
+import static org.onlab.packet.PPPoEDTag.PPPOED_TAG_VENDOR_SPECIFIC;
+
+import static org.onlab.util.Tools.getIntegerProperty;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.net.config.basics.SubjectFactories.APP_SUBJECT_FACTORY;
+
+import static org.opencord.pppoeagent.impl.OsgiPropertyConstants.PPPOE_MAX_MTU;
+import static org.opencord.pppoeagent.impl.OsgiPropertyConstants.PPPOE_MAX_MTU_DEFAULT;
+import static org.opencord.pppoeagent.impl.OsgiPropertyConstants.ENABLE_CIRCUIT_ID_VALIDATION;
+import static org.opencord.pppoeagent.impl.OsgiPropertyConstants.ENABLE_CIRCUIT_ID_VALIDATION_DEFAULT;
+import static org.opencord.pppoeagent.impl.OsgiPropertyConstants.PACKET_PROCESSOR_THREADS;
+import static org.opencord.pppoeagent.impl.OsgiPropertyConstants.PACKET_PROCESSOR_THREADS_DEFAULT;
+
+/**
+ * PPPoE Intermediate Agent application.
+ */
+@Component(immediate = true,
+property = {
+        PPPOE_MAX_MTU + ":Integer=" + PPPOE_MAX_MTU_DEFAULT,
+        ENABLE_CIRCUIT_ID_VALIDATION + ":Boolean=" + ENABLE_CIRCUIT_ID_VALIDATION_DEFAULT,
+        PACKET_PROCESSOR_THREADS + ":Integer=" + PACKET_PROCESSOR_THREADS_DEFAULT,
+})
+public class PppoeAgent
+        extends AbstractListenerManager<PppoeAgentEvent, PppoeAgentListener>
+        implements PppoeAgentService {
+    private static final String APP_NAME = "org.opencord.pppoeagent";
+    private static final short QINQ_VID_NONE = (short) -1;
+
+    private final InternalConfigListener cfgListener = new InternalConfigListener();
+    private final Set<ConfigFactory> factories = ImmutableSet.of(
+            new ConfigFactory<>(APP_SUBJECT_FACTORY, PppoeAgentConfig.class, "pppoeagent") {
+                @Override
+                public PppoeAgentConfig createConfig() {
+                    return new PppoeAgentConfig();
+                }
+            }
+    );
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected CoreService coreService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected NetworkConfigRegistry cfgService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected PacketService packetService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected ComponentConfigService componentConfigService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected SadisService sadisService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected DeviceService deviceService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected MastershipService mastershipService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected StorageService storageService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected PppoeAgentCountersStore pppoeAgentCounters;
+
+    // OSGi Properties
+    protected int pppoeMaxMtu = PPPOE_MAX_MTU_DEFAULT;
+    protected boolean enableCircuitIdValidation = ENABLE_CIRCUIT_ID_VALIDATION_DEFAULT;
+    protected int packetProcessorThreads = PACKET_PROCESSOR_THREADS_DEFAULT;
+
+    private ApplicationId appId;
+    private InnerDeviceListener deviceListener = new InnerDeviceListener();
+    private InnerMastershipListener changeListener = new InnerMastershipListener();
+    private PppoeAgentPacketProcessor pppoeAgentPacketProcessor = new PppoeAgentPacketProcessor();
+    protected BaseInformationService<SubscriberAndDeviceInformation> subsService;
+    private PppoeAgentStoreDelegate delegate = new InnerPppoeAgentStoreDelegate();
+
+    Set<ConnectPoint> pppoeConnectPoints;
+    protected AtomicReference<ConnectPoint> pppoeServerConnectPoint = new AtomicReference<>();
+    protected boolean useOltUplink = false;
+
+    static ConsistentMap<MacAddress, PppoeSessionInfo> sessionsMap;
+
+    @Override
+    public Map<MacAddress, PppoeSessionInfo> getSessionsMap() {
+        return ImmutableMap.copyOf(sessionsMap.asJavaMap());
+    }
+
+    @Override
+    public void clearSessionsMap() {
+        sessionsMap.clear();
+    }
+
+    private final ArrayList<CircuitIdFieldName> circuitIdfields = new ArrayList<>(Arrays.asList(
+            CircuitIdFieldName.AcessNodeIdentifier,
+            CircuitIdFieldName.Slot,
+            CircuitIdFieldName.Port,
+            CircuitIdFieldName.OnuSerialNumber));
+
+    protected ExecutorService packetProcessorExecutor;
+
+    @Activate
+    protected void activate(ComponentContext context) {
+        eventDispatcher.addSink(PppoeAgentEvent.class, listenerRegistry);
+
+        appId = coreService.registerApplication(APP_NAME);
+        cfgService.addListener(cfgListener);
+        componentConfigService.registerProperties(getClass());
+
+        KryoNamespace serializer = KryoNamespace.newBuilder()
+                .register(KryoNamespaces.API)
+                .register(PppoeSessionInfo.class)
+                .register(MacAddress.class)
+                .register(SubscriberAndDeviceInformation.class)
+                .register(UniTagInformation.class)
+                .register(ConnectPoint.class)
+                .build();
+
+        sessionsMap = storageService.<MacAddress, PppoeSessionInfo>consistentMapBuilder()
+                .withName("pppoeagent-sessions")
+                .withSerializer(Serializer.using(serializer))
+                .withApplicationId(appId)
+                .build();
+
+        factories.forEach(cfgService::registerConfigFactory);
+        deviceService.addListener(deviceListener);
+        subsService = sadisService.getSubscriberInfoService();
+        mastershipService.addListener(changeListener);
+        pppoeAgentCounters.setDelegate(delegate);
+        updateConfig();
+        packetService.addProcessor(pppoeAgentPacketProcessor, PacketProcessor.director(0));
+        if (context != null) {
+            modified(context);
+        }
+        log.info("PPPoE Intermediate Agent Started");
+    }
+
+    @Deactivate
+    protected void deactivate() {
+        cfgService.removeListener(cfgListener);
+        factories.forEach(cfgService::unregisterConfigFactory);
+        packetService.removeProcessor(pppoeAgentPacketProcessor);
+        packetProcessorExecutor.shutdown();
+        componentConfigService.unregisterProperties(getClass(), false);
+        deviceService.removeListener(deviceListener);
+        eventDispatcher.removeSink(PppoeAgentEvent.class);
+        pppoeAgentCounters.unsetDelegate(delegate);
+        log.info("PPPoE Intermediate Agent Stopped");
+    }
+
+    private void updateConfig() {
+        PppoeAgentConfig cfg = cfgService.getConfig(appId, PppoeAgentConfig.class);
+        if (cfg == null) {
+            log.warn("PPPoE server info not available");
+            return;
+        }
+
+        synchronized (this) {
+            pppoeConnectPoints = Sets.newConcurrentHashSet(cfg.getPppoeServerConnectPoint());
+        }
+        useOltUplink = cfg.getUseOltUplinkForServerPktInOut();
+    }
+
+    /**
+     * Returns whether the passed port is the uplink port of the olt device.
+     */
+    private boolean isUplinkPortOfOlt(DeviceId dId, Port p) {
+        log.debug("isUplinkPortOfOlt: DeviceId: {} Port: {}", dId, p);
+        if (!mastershipService.isLocalMaster(dId)) {
+            return false;
+        }
+
+        Device d = deviceService.getDevice(dId);
+        SubscriberAndDeviceInformation deviceInfo = subsService.get(d.serialNumber());
+        if (deviceInfo != null) {
+            return (deviceInfo.uplinkPort() == p.number().toLong());
+        }
+
+        return false;
+    }
+
+    /**
+     * Returns the connectPoint which is the uplink port of the OLT.
+     */
+    private ConnectPoint getUplinkConnectPointOfOlt(DeviceId dId) {
+        Device d = deviceService.getDevice(dId);
+        SubscriberAndDeviceInformation deviceInfo = subsService.get(d.serialNumber());
+        log.debug("getUplinkConnectPointOfOlt DeviceId: {} devInfo: {}", dId, deviceInfo);
+        if (deviceInfo != null) {
+            PortNumber pNum = PortNumber.portNumber(deviceInfo.uplinkPort());
+            Port port = deviceService.getPort(d.id(), pNum);
+            if (port != null) {
+                return new ConnectPoint(d.id(), pNum);
+            }
+        }
+        return null;
+    }
+
+    @Modified
+    protected void modified(ComponentContext context) {
+        Dictionary<?, ?> properties = context.getProperties();
+
+        Integer newPppoeMaxMtu = getIntegerProperty(properties, PPPOE_MAX_MTU);
+        if (newPppoeMaxMtu != null) {
+            if (newPppoeMaxMtu != pppoeMaxMtu && newPppoeMaxMtu >= 0) {
+                log.info("PPPPOE MTU modified from {} to {}", pppoeMaxMtu, newPppoeMaxMtu);
+                pppoeMaxMtu = newPppoeMaxMtu;
+            } else if (newPppoeMaxMtu < 0) {
+                log.error("Invalid newPppoeMaxMtu : {}, defaulting to 1492", newPppoeMaxMtu);
+                pppoeMaxMtu = PPPOE_MAX_MTU_DEFAULT;
+            }
+        }
+
+        Boolean newEnableCircuitIdValidation = Tools.isPropertyEnabled(properties, ENABLE_CIRCUIT_ID_VALIDATION);
+        if (newEnableCircuitIdValidation != null) {
+            if (enableCircuitIdValidation != newEnableCircuitIdValidation) {
+                log.info("Property enableCircuitIdValidation modified from {} to {}",
+                        enableCircuitIdValidation, newEnableCircuitIdValidation);
+                enableCircuitIdValidation = newEnableCircuitIdValidation;
+            }
+        }
+
+        String s = Tools.get(properties, PACKET_PROCESSOR_THREADS);
+
+        int oldpacketProcessorThreads = packetProcessorThreads;
+        packetProcessorThreads = Strings.isNullOrEmpty(s) ? oldpacketProcessorThreads
+                : Integer.parseInt(s.trim());
+        if (packetProcessorExecutor == null || oldpacketProcessorThreads != packetProcessorThreads) {
+            if (packetProcessorExecutor != null) {
+                packetProcessorExecutor.shutdown();
+            }
+            packetProcessorExecutor = newFixedThreadPool(packetProcessorThreads,
+                    groupedThreads("onos/pppoe",
+                            "pppoe-packet-%d", log));
+        }
+    }
+
+    /**
+     * Selects a connect point through an available device for which it is the master.
+     */
+    private void selectServerConnectPoint() {
+        synchronized (this) {
+            pppoeServerConnectPoint.set(null);
+            if (pppoeConnectPoints != null) {
+                // find a connect point through a device for which we are master
+                for (ConnectPoint cp: pppoeConnectPoints) {
+                    if (mastershipService.isLocalMaster(cp.deviceId())) {
+                        if (deviceService.isAvailable(cp.deviceId())) {
+                            pppoeServerConnectPoint.set(cp);
+                        }
+                        log.info("PPPOE connectPoint selected is {}", cp);
+                        break;
+                    }
+                }
+            }
+            log.info("PPPOE Server connectPoint is {}", pppoeServerConnectPoint.get());
+            if (pppoeServerConnectPoint.get() == null) {
+                log.error("Master of none, can't relay PPPOE messages to server");
+            }
+        }
+    }
+
+    private SubscriberAndDeviceInformation getSubscriber(ConnectPoint cp) {
+        Port p = deviceService.getPort(cp);
+        String subscriberId = p.annotations().value(AnnotationKeys.PORT_NAME);
+        return subsService.get(subscriberId);
+    }
+
+    private SubscriberAndDeviceInformation getDevice(PacketContext context) {
+        String serialNo = deviceService.getDevice(context.inPacket().
+                receivedFrom().deviceId()).serialNumber();
+
+        return subsService.get(serialNo);
+    }
+
+    private UniTagInformation getUnitagInformationFromPacketContext(PacketContext context,
+                                                                    SubscriberAndDeviceInformation sub) {
+        return sub.uniTagList()
+                .stream()
+                .filter(u -> u.getPonCTag().toShort() == context.inPacket().parsed().getVlanID())
+                .findFirst()
+                .orElse(null);
+    }
+
+    private boolean removeSessionsByConnectPoint(ConnectPoint cp) {
+        boolean removed = false;
+        for (MacAddress key : sessionsMap.keySet()) {
+            PppoeSessionInfo entry = sessionsMap.asJavaMap().get(key);
+            if (entry.getClientCp().equals(cp)) {
+                sessionsMap.remove(key);
+                removed = true;
+            }
+        }
+        return removed;
+    }
+
+    private boolean removeSessionsByDevice(DeviceId devid) {
+        boolean removed = false;
+        for (MacAddress key : sessionsMap.keySet()) {
+            PppoeSessionInfo entry = sessionsMap.asJavaMap().get(key);
+            if (entry.getClientCp().deviceId().equals(devid)) {
+                sessionsMap.remove(key);
+                removed = true;
+            }
+        }
+        return removed;
+    }
+
+    private class PppoeAgentPacketProcessor implements PacketProcessor {
+        @Override
+        public void process(PacketContext context) {
+            packetProcessorExecutor.execute(() -> {
+                processInternal(context);
+            });
+        }
+
+        private void processInternal(PacketContext context) {
+            Ethernet packet = (Ethernet) context.inPacket().parsed().clone();
+            if (packet.getEtherType() == Ethernet.TYPE_PPPOED) {
+                processPppoedPacket(context, packet);
+            }
+        }
+
+        private void processPppoedPacket(PacketContext context, Ethernet packet) {
+            PPPoED pppoed = (PPPoED) packet.getPayload();
+            if (pppoed == null) {
+                log.warn("PPPoED payload is null");
+                return;
+            }
+
+            final byte pppoedCode = pppoed.getCode();
+            final short sessionId = pppoed.getSessionId();
+            final MacAddress clientMacAddress;
+            final ConnectPoint packetCp = context.inPacket().receivedFrom();
+            boolean serverMessage = false;
+
+            // Get the client MAC address
+            switch (pppoedCode) {
+                case PPPOED_CODE_PADT: {
+                    if (sessionsMap.containsKey(packet.getDestinationMAC())) {
+                        clientMacAddress = packet.getDestinationMAC();
+                        serverMessage = true;
+                    } else if (sessionsMap.containsKey(packet.getSourceMAC())) {
+                        clientMacAddress = packet.getSourceMAC();
+                    } else {
+                        // In the unlikely case of receiving a PADT without an existing session
+                        log.warn("PADT received for unknown session. Dropping packet.");
+                        return;
+                    }
+                    break;
+                }
+                case PPPOED_CODE_PADI:
+                case PPPOED_CODE_PADR: {
+                    clientMacAddress = packet.getSourceMAC();
+                    break;
+                }
+                default: {
+                    clientMacAddress = packet.getDestinationMAC();
+                    serverMessage = true;
+                    break;
+                }
+            }
+
+            SubscriberAndDeviceInformation subsInfo;
+            if (serverMessage) {
+                if (!sessionsMap.containsKey(clientMacAddress)) {
+                    log.error("PPPoED message received from server without an existing session. Message not relayed.");
+                    return;
+                } else {
+                    PppoeSessionInfo sessInfo = sessionsMap.get(clientMacAddress).value();
+                    subsInfo = getSubscriber(sessInfo.getClientCp());
+                }
+            } else {
+                subsInfo = getSubscriber(packetCp);
+            }
+
+            if (subsInfo == null) {
+                log.error("No Sadis info for subscriber on connect point {}. Message not relayed.", packetCp);
+                return;
+            }
+
+            log.trace("{} received from {} at {} with client mac: {}",
+                    PPPoED.Type.getTypeByValue(pppoedCode).toString(),
+                    serverMessage ? "server" : "client", packetCp, clientMacAddress);
+
+            if (log.isTraceEnabled()) {
+                log.trace("PPPoED message received from {} at {} {}",
+                        serverMessage ? "server" : "client", packetCp, packet);
+            }
+
+            // In case of PADI, force the removal of the previous session entry
+            if ((pppoedCode == PPPOED_CODE_PADI) && (sessionsMap.containsKey(clientMacAddress))) {
+                log.trace("PADI received from MAC: {} with existing session data. Removing the existing data.",
+                        clientMacAddress.toString());
+                sessionsMap.remove(clientMacAddress);
+            }
+
+            // Fill the session map entry
+            PppoeSessionInfo sessionInfo;
+            if (!sessionsMap.containsKey(clientMacAddress)) {
+                if (!serverMessage)  {
+                    ConnectPoint serverCp = getServerConnectPoint(packetCp.deviceId());
+                    SubscriberAndDeviceInformation subscriber = getSubscriber(packetCp);
+                    sessionInfo = new PppoeSessionInfo(packetCp, serverCp, pppoedCode,
+                            sessionId, subscriber, clientMacAddress);
+                    sessionsMap.put(clientMacAddress, sessionInfo);
+                } else {
+                    // This case was already covered.
+                    return;
+                }
+            } else {
+                sessionInfo = sessionsMap.get(clientMacAddress).value();
+            }
+
+            switch (pppoedCode) {
+                case PPPOED_CODE_PADI:
+                case PPPOED_CODE_PADR:
+                    updatePppoeAgentCountersStore(sessionInfo.getSubscriber(),
+                            pppoedCode == PPPOED_CODE_PADI ? PppoeAgentCounterNames.PADI : PppoeAgentCounterNames.PADR);
+
+                    Ethernet padir = processPacketFromClient(context, packet, pppoed, sessionInfo, clientMacAddress);
+                    if (padir != null) {
+                        if (padir.serialize().length <= pppoeMaxMtu) {
+                            forwardPacketToServer(padir, sessionInfo);
+                        } else {
+                            log.debug("MTU message size: {} exceeded configured pppoeMaxMtu: {}. Dropping Packet.",
+                                    padir.serialize().length, pppoeMaxMtu);
+                            forwardPacketToClient(errorToClient(packet, pppoed, "MTU message size exceeded"),
+                                                                sessionInfo, clientMacAddress);
+                            updatePppoeAgentCountersStore(sessionInfo.getSubscriber(),
+                                                          PppoeAgentCounterNames.MTU_EXCEEDED);
+                        }
+                    }
+                    break;
+                case PPPOED_CODE_PADO:
+                case PPPOED_CODE_PADS:
+                    updatePppoeAgentCountersStore(sessionInfo.getSubscriber(),
+                            pppoedCode == PPPOED_CODE_PADO ? PppoeAgentCounterNames.PADO : PppoeAgentCounterNames.PADS);
+                    Ethernet pados = processPacketFromServer(packet, pppoed, sessionInfo, clientMacAddress);
+                    if (pados != null) {
+                        forwardPacketToClient(pados, sessionInfo, clientMacAddress);
+                    }
+                    break;
+                case PPPOED_CODE_PADT:
+                    if (serverMessage) {
+                        updatePppoeAgentCountersStore(sessionInfo.getSubscriber(),
+                                                      PppoeAgentCounterNames.PADT_FROM_SERVER);
+                        forwardPacketToClient(packet, sessionInfo, clientMacAddress);
+                    } else {
+                        updatePppoeAgentCountersStore(sessionInfo.getSubscriber(),
+                                                      PppoeAgentCounterNames.PADT_FROM_CLIENT);
+                        forwardPacketToServer(packet, sessionInfo);
+                    }
+
+                    String reason = "";
+                    PPPoEDTag genericErrorTag = pppoed.getTags()
+                            .stream()
+                            .filter(tag -> tag.getType() == PPPOED_TAG_GENERIC_ERROR)
+                            .findFirst()
+                            .orElse(null);
+
+                    if (genericErrorTag != null) {
+                        reason = new String(genericErrorTag.getValue(), StandardCharsets.UTF_8);
+                    }
+                    log.debug("PADT sessionId:{}  client MAC:{}  Terminate reason:{}.",
+                            Integer.toHexString(sessionId & 0xFFFF), clientMacAddress, reason);
+
+                    boolean knownSessionId = sessionInfo.getSessionId() == sessionId;
+                    if (knownSessionId) {
+                        PppoeSessionInfo removedSessionInfo = Versioned
+                                .valueOrNull(sessionsMap.remove(clientMacAddress));
+                        if (removedSessionInfo != null) {
+                            post(new PppoeAgentEvent(PppoeAgentEvent.Type.TERMINATE, removedSessionInfo,
+                                                     packetCp, clientMacAddress, reason));
+                        }
+                    } else {
+                        log.warn("PADT received for a known MAC address but for unknown session.");
+                    }
+                    break;
+                default:
+                    break;
+            }
+        }
+
+        private Ethernet processPacketFromClient(PacketContext context, Ethernet ethernetPacket, PPPoED pppoed,
+                                                 PppoeSessionInfo sessionInfo, MacAddress clientMacAddress) {
+            byte pppoedCode = pppoed.getCode();
+
+            sessionInfo.setPacketCode(pppoedCode);
+            sessionsMap.put(clientMacAddress, sessionInfo);
+
+            // Update Counters
+            for (PPPoEDTag tag : pppoed.getTags()) {
+                if (tag.getType() == PPPOED_TAG_GENERIC_ERROR) {
+                    updatePppoeAgentCountersStore(sessionInfo.getSubscriber(),
+                            PppoeAgentCounterNames.GENERIC_ERROR_FROM_CLIENT);
+                    break;
+                }
+            }
+
+            Ethernet ethFwd = ethernetPacket;
+
+            // If this is a PADI packet, there'll be a START event.
+            if (pppoedCode == PPPOED_CODE_PADI) {
+                post(new PppoeAgentEvent(PppoeAgentEvent.Type.START, sessionInfo, sessionInfo.getClientCp(),
+                        clientMacAddress));
+            }
+
+            // Creates the vendor specific tag.
+            String circuitId = getCircuitId(sessionInfo.getClientCp());
+            if (circuitId == null) {
+                log.error("Failed to build circuid-id for client on connect point {}. Dropping packet.",
+                        sessionInfo.getClientCp());
+                return null;
+            }
+
+            // Checks whether the circuit-id is valid, if it's not it drops the packet.
+            if (!isCircuitIdValid(circuitId, sessionInfo.getSubscriber())) {
+                log.warn("Invalid circuit ID, dropping packet.");
+                PppoeAgentEvent invalidCidEvent = new PppoeAgentEvent(PppoeAgentEvent.Type.INVALID_CID, sessionInfo,
+                        context.inPacket().receivedFrom(), clientMacAddress);
+                post(invalidCidEvent);
+                return null;
+            }
+
+            String remoteId = sessionInfo.getSubscriber().remoteId();
+            byte[] vendorSpecificTag = new PPPoEDVendorSpecificTag(circuitId, remoteId).toByteArray();
+
+            // According to TR-101, R-149 (by Broadband Forum), agent must REPLACE vendor-specific tag that may come
+            // from client message with its own tag.
+            // The following block ensures that agent removes any previous vendor-specific tag.
+            List<PPPoEDTag> originalTags = pppoed.getTags();
+            if (originalTags != null) {
+                PPPoEDTag originalVendorSpecificTag = originalTags.stream()
+                        .filter(tag -> tag.getType() == PPPOED_TAG_VENDOR_SPECIFIC)
+                        .findFirst()
+                        .orElse(null);
+
+                if (originalVendorSpecificTag != null) {
+                    int tagToRemoveLength = originalVendorSpecificTag.getLength();
+                    originalTags.removeIf(tag -> tag.getType() == PPPOED_TAG_VENDOR_SPECIFIC);
+                    pppoed.setPayloadLength((short) (pppoed.getPayloadLength() - tagToRemoveLength));
+                }
+            }
+
+            pppoed.setTag(PPPOED_TAG_VENDOR_SPECIFIC, vendorSpecificTag);
+
+            ethFwd.setPayload(pppoed);
+            ethFwd.setQinQTPID(Ethernet.TYPE_VLAN);
+
+            UniTagInformation uniTagInformation = getUnitagInformationFromPacketContext(context,
+                    sessionInfo.getSubscriber());
+            if (uniTagInformation == null) {
+                log.warn("Missing service information for connectPoint {} / cTag {}",
+                        context.inPacket().receivedFrom(),  context.inPacket().parsed().getVlanID());
+                return null;
+            }
+            ethFwd.setQinQVID(uniTagInformation.getPonSTag().toShort());
+            ethFwd.setPad(true);
+            return ethFwd;
+        }
+
+        private Ethernet processPacketFromServer(Ethernet ethernetPacket, PPPoED pppoed,
+                                                 PppoeSessionInfo sessionInfo, MacAddress clientMacAddress) {
+            // Update counters
+            List<PPPoEDTag> tags = pppoed.getTags();
+            for (PPPoEDTag tag : tags) {
+                switch (tag.getType()) {
+                    case PPPOED_TAG_GENERIC_ERROR:
+                        updatePppoeAgentCountersStore(sessionInfo.getSubscriber(),
+                                PppoeAgentCounterNames.GENERIC_ERROR_FROM_SERVER);
+                        break;
+                    case PPPOED_TAG_SERVICE_NAME_ERROR:
+                        updatePppoeAgentCountersStore(sessionInfo.getSubscriber(),
+                                PppoeAgentCounterNames.SERVICE_NAME_ERROR);
+                        break;
+                    case PPPOED_TAG_AC_SYSTEM_ERROR:
+                        updatePppoeAgentCountersStore(sessionInfo.getSubscriber(),
+                                PppoeAgentCounterNames.AC_SYSTEM_ERROR);
+                        break;
+                    default:
+                        break;
+                }
+            }
+
+            byte pppoedCode = pppoed.getCode();
+
+            if (pppoedCode == PPPOED_CODE_PADS) {
+                log.debug("PADS sessionId:{}  client MAC:{}", Integer.toHexString(pppoed.getSessionId() & 0xFFFF),
+                        clientMacAddress);
+                sessionInfo.setSessionId(pppoed.getSessionId());
+            }
+            sessionInfo.setPacketCode(pppoedCode);
+            sessionsMap.put(clientMacAddress, sessionInfo);
+
+            PppoeAgentEvent.Type eventType = pppoedCode == PPPOED_CODE_PADS ?
+                    PppoeAgentEvent.Type.SESSION_ESTABLISHED :
+                    PppoeAgentEvent.Type.NEGOTIATION;
+
+            post(new PppoeAgentEvent(eventType, sessionInfo, sessionInfo.getClientCp(), clientMacAddress));
+
+            ethernetPacket.setQinQVID(QINQ_VID_NONE);
+            ethernetPacket.setPad(true);
+            return ethernetPacket;
+        }
+
+        private void updatePppoeAgentCountersStore(SubscriberAndDeviceInformation sub,
+                                                   PppoeAgentCounterNames counterType) {
+            // Update global counter stats
+            pppoeAgentCounters.incrementCounter(PppoeAgentEvent.GLOBAL_COUNTER, counterType);
+            if (sub == null) {
+                log.warn("Counter not updated as subscriber info not found.");
+            } else {
+                // Update subscriber counter stats
+                pppoeAgentCounters.incrementCounter(sub.id(), counterType);
+            }
+        }
+
+        private String getCircuitId(ConnectPoint cp) {
+            return new CircuitIdBuilder()
+                    .setConnectPoint(cp)
+                    .setDeviceService(deviceService)
+                    .setSubsService(subsService)
+                    .setCircuitIdConfig(circuitIdfields)
+                    .addCustomSeparator(CircuitIdFieldName.AcessNodeIdentifier, " ")
+                    .addCustomSeparator(CircuitIdFieldName.Port, ":")
+                    .build();
+        }
+
+        protected ConnectPoint getServerConnectPoint(DeviceId deviceId) {
+            ConnectPoint serverCp;
+            if (!useOltUplink) {
+                serverCp = pppoeServerConnectPoint.get();
+            } else {
+                serverCp = getUplinkConnectPointOfOlt(deviceId);
+            }
+            return serverCp;
+        }
+
+        private boolean isCircuitIdValid(String cId, SubscriberAndDeviceInformation entry) {
+            if (!enableCircuitIdValidation) {
+                log.debug("Circuit ID validation is disabled.");
+                return true;
+            }
+
+            if (entry == null) {
+                log.error("SubscriberAndDeviceInformation cannot be null.");
+                return false;
+            }
+
+            if (entry.circuitId() == null || entry.circuitId().isEmpty()) {
+                log.debug("Circuit ID not configured in SADIS entry. No check is done.");
+                return true;
+            } else {
+                if (cId.equals(entry.circuitId())) {
+                    log.info("Circuit ID in packet: {} matched the configured entry on SADIS.", cId);
+                    return true;
+                } else {
+                    log.warn("Circuit ID in packet: {} did not match the configured entry on SADIS: {}.",
+                            cId, entry.circuitId());
+                    return false;
+                }
+            }
+        }
+
+        private void forwardPacketToServer(Ethernet packet, PppoeSessionInfo sessionInfo) {
+            ConnectPoint toSendTo = sessionInfo.getServerCp();
+            if (toSendTo != null) {
+                log.info("Sending PPPOE packet to server at {}", toSendTo);
+                TrafficTreatment t = DefaultTrafficTreatment.builder().setOutput(toSendTo.port()).build();
+                OutboundPacket o = new DefaultOutboundPacket(toSendTo.deviceId(), t,
+                        ByteBuffer.wrap(packet.serialize()));
+                if (log.isTraceEnabled()) {
+                    log.trace("Relaying packet to pppoe server at {} {}", toSendTo, packet);
+                }
+                packetService.emit(o);
+
+                updatePppoeAgentCountersStore(sessionInfo.getSubscriber(),
+                        PppoeAgentCounterNames.PPPOED_PACKETS_TO_SERVER);
+            } else {
+                log.error("No connect point to send msg to PPPOE Server");
+            }
+        }
+
+        private void forwardPacketToClient(Ethernet packet, PppoeSessionInfo sessionInfo, MacAddress clientMacAddress) {
+            ConnectPoint subCp = sessionInfo.getClientCp();
+            if (subCp == null) {
+                log.error("Dropping PPPOE packet, can't find a connectpoint for MAC {}", clientMacAddress);
+                return;
+            }
+
+            log.info("Sending PPPOE packet to client at {}", subCp);
+            TrafficTreatment t = DefaultTrafficTreatment.builder()
+                    .setOutput(subCp.port()).build();
+            OutboundPacket o = new DefaultOutboundPacket(
+                    subCp.deviceId(), t, ByteBuffer.wrap(packet.serialize()));
+            if (log.isTraceEnabled()) {
+                log.trace("Relaying packet to pppoe client at {} {}", subCp, packet);
+            }
+
+            packetService.emit(o);
+
+            updatePppoeAgentCountersStore(sessionInfo.getSubscriber(),
+                    PppoeAgentCounterNames.PPPOED_PACKETS_FROM_SERVER);
+        }
+
+        private Ethernet errorToClient(Ethernet packet, PPPoED p, String errString) {
+            PPPoED err = new PPPoED();
+            err.setVersion(p.getVersion());
+            err.setType(p.getType());
+            switch (p.getCode()) {
+                case PPPOED_CODE_PADI:
+                    err.setCode(PPPOED_CODE_PADO);
+                    break;
+                case PPPOED_CODE_PADR:
+                    err.setCode(PPPOED_CODE_PADS);
+                    break;
+                default:
+                    break;
+            }
+            err.setCode(p.getCode());
+            err.setSessionId(p.getSessionId());
+            err.setTag(PPPOED_TAG_GENERIC_ERROR, errString.getBytes(StandardCharsets.UTF_8));
+
+            Ethernet ethPacket = new Ethernet();
+            ethPacket.setPayload(err);
+            ethPacket.setSourceMACAddress(packet.getDestinationMACAddress());
+            ethPacket.setDestinationMACAddress(packet.getSourceMACAddress());
+            ethPacket.setQinQVID(QINQ_VID_NONE);
+            ethPacket.setPad(true);
+
+            return ethPacket;
+        }
+    }
+
+    private class InternalConfigListener implements NetworkConfigListener {
+        @Override
+        public void event(NetworkConfigEvent event) {
+
+            if ((event.type() == NetworkConfigEvent.Type.CONFIG_ADDED ||
+                    event.type() == NetworkConfigEvent.Type.CONFIG_UPDATED) &&
+                    event.configClass().equals(PppoeAgentConfig.class)) {
+                updateConfig();
+                log.info("Reconfigured");
+            }
+        }
+    }
+
+    /**
+     * Handles Mastership changes for the devices which connect to the PPPOE server.
+     */
+    private class InnerMastershipListener implements MastershipListener {
+        @Override
+        public void event(MastershipEvent event) {
+            if (!useOltUplink) {
+                if (pppoeServerConnectPoint.get() != null &&
+                        pppoeServerConnectPoint.get().deviceId().equals(event.subject())) {
+                    log.trace("Mastership Event recevived for {}", event.subject());
+                    // mastership of the device for our connect point has changed, reselect
+                    selectServerConnectPoint();
+                }
+            }
+        }
+    }
+
+    private class InnerDeviceListener implements DeviceListener {
+        @Override
+        public void event(DeviceEvent event) {
+            DeviceId devId = event.subject().id();
+
+            if (log.isTraceEnabled() && !event.type().equals(DeviceEvent.Type.PORT_STATS_UPDATED)) {
+                log.trace("Device Event received for {} event {}", event.subject(), event.type());
+            }
+
+            // Handle events from any other device
+            switch (event.type()) {
+                case PORT_UPDATED:
+                    if (!event.port().isEnabled()) {
+                        ConnectPoint cp = new ConnectPoint(devId, event.port().number());
+                        removeSessionsByConnectPoint(cp);
+                    }
+                    break;
+                case PORT_REMOVED:
+                    // Remove all entries related to this port from sessions map
+                    ConnectPoint cp = new ConnectPoint(devId, event.port().number());
+                    removeSessionsByConnectPoint(cp);
+                    break;
+                case DEVICE_REMOVED:
+                    // Remove all entries related to this device from sessions map
+                    removeSessionsByDevice(devId);
+                    break;
+                default:
+                    break;
+            }
+        }
+    }
+
+    private class InnerPppoeAgentStoreDelegate implements PppoeAgentStoreDelegate {
+        @Override
+        public void notify(PppoeAgentEvent event) {
+            if (event.type().equals(PppoeAgentEvent.Type.STATS_UPDATE)) {
+                PppoeAgentEvent toPost = event;
+                if (event.getSubscriberId() != null) {
+                    // infuse the event with the allocation info before posting
+                    PppoeSessionInfo info = Versioned.valueOrNull(
+                            sessionsMap.stream().filter(entry -> event.getSubscriberId()
+                                    .equals(entry.getValue().value().getSubscriber().id()))
+                                    .map(Map.Entry::getValue)
+                                    .findFirst()
+                                    .orElse(null));
+                    if (info == null) {
+                        log.debug("Not handling STATS_UPDATE event for session that no longer exists. {}.", event);
+                        return;
+                    }
+
+                    toPost = new PppoeAgentEvent(event.type(), info, event.getCounterName(), event.getCounterValue(),
+                                                 info.getClientMac(), event.getSubscriberId());
+                }
+                post(toPost);
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentConfig.java b/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentConfig.java
new file mode 100644
index 0000000..5d14c76
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentConfig.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.pppoeagent.impl;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.google.common.collect.ImmutableSet;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.config.Config;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class PppoeAgentConfig extends Config<ApplicationId> {
+    private static final String PPPOE_CONNECT_POINTS = "pppoeServerConnectPoints";
+    private static final String USE_OLT_ULPORT_FOR_PKT_INOUT = "useOltUplinkForServerPktInOut";
+
+    protected static final Boolean DEFAULT_USE_OLT_ULPORT_FOR_PKT_INOUT = true;
+
+    @Override
+    public boolean isValid() {
+        return hasOnlyFields(PPPOE_CONNECT_POINTS, USE_OLT_ULPORT_FOR_PKT_INOUT);
+    }
+
+    /**
+     * Returns whether the app would use the uplink port of OLT for sending/receving
+     * messages to/from the server.
+     *
+     * @return true if OLT uplink port is to be used, false otherwise
+     */
+    public boolean getUseOltUplinkForServerPktInOut() {
+        if (object == null) {
+            return DEFAULT_USE_OLT_ULPORT_FOR_PKT_INOUT;
+        }
+        if (!object.has(USE_OLT_ULPORT_FOR_PKT_INOUT)) {
+            return DEFAULT_USE_OLT_ULPORT_FOR_PKT_INOUT;
+        }
+        return object.path(USE_OLT_ULPORT_FOR_PKT_INOUT).asBoolean();
+    }
+
+    /**
+     * Returns the pppoe server connect points.
+     *
+     * @return pppoe server connect points
+     */
+    public Set<ConnectPoint> getPppoeServerConnectPoint() {
+        if (object == null) {
+            return new HashSet<ConnectPoint>();
+        }
+
+        if (!object.has(PPPOE_CONNECT_POINTS)) {
+            return ImmutableSet.of();
+        }
+
+        ImmutableSet.Builder<ConnectPoint> builder = ImmutableSet.builder();
+        ArrayNode arrayNode = (ArrayNode) object.path(PPPOE_CONNECT_POINTS);
+        for (JsonNode jsonNode : arrayNode) {
+            String portName = jsonNode.asText(null);
+            if (portName == null) {
+                return null;
+            }
+            try {
+                builder.add(ConnectPoint.deviceConnectPoint(portName));
+            } catch (IllegalArgumentException e) {
+                return null;
+            }
+        }
+        return builder.build();
+    }
+
+
+}
diff --git a/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentCounterNames.java b/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentCounterNames.java
new file mode 100644
index 0000000..0d58078
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentCounterNames.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.opencord.pppoeagent.impl;
+
+import com.google.common.collect.ImmutableSet;
+
+import java.util.Set;
+
+/**
+ * Represents PPPoE agent counters type.
+ */
+public enum PppoeAgentCounterNames {
+    /**
+     * Number of PADI messages received from client.
+     */
+    PADI,
+    /**
+     * Number of PADO messages received from server.
+     */
+    PADO,
+    /**
+     * Number of PADR messages received from client.
+     */
+    PADR,
+    /**
+     * Number of PADS messages received from server.
+     */
+    PADS,
+    /**
+     * Number of PADT messages received from server.
+     */
+    PADT_FROM_SERVER,
+    /**
+     * Number of PADT messages received from client.
+     */
+    PADT_FROM_CLIENT,
+    /**
+     * Number of PPPoED messages sent to server.
+     */
+    PPPOED_PACKETS_TO_SERVER,
+    /**
+     * Number  of PPPoED messages received from server.
+     */
+    PPPOED_PACKETS_FROM_SERVER,
+    /**
+     * Number of MTU Exceeded errors generated by the PPPoED agent.
+     */
+    MTU_EXCEEDED,
+    /**
+     * Number of Generic Errors received from server.
+     */
+    GENERIC_ERROR_FROM_SERVER,
+    /**
+     * Number of Generic Errors received from client.
+     */
+    GENERIC_ERROR_FROM_CLIENT,
+    /**
+     * Number of ServiceName Errors received from server.
+     */
+    SERVICE_NAME_ERROR,
+    /**
+     * Number of AC-System Errors received from server.
+     */
+    AC_SYSTEM_ERROR;
+
+    /**
+     * Supported types of PPPoED agent counters.
+     */
+    public  static final Set<PppoeAgentCounterNames> SUPPORTED_COUNTERS = ImmutableSet.of(
+            PppoeAgentCounterNames.PADI, PppoeAgentCounterNames.PADO,
+            PppoeAgentCounterNames.PADR, PppoeAgentCounterNames.PADS,
+            PppoeAgentCounterNames.PADT_FROM_SERVER, PppoeAgentCounterNames.PADT_FROM_CLIENT,
+            PppoeAgentCounterNames.PPPOED_PACKETS_TO_SERVER, PppoeAgentCounterNames.PPPOED_PACKETS_FROM_SERVER,
+            PppoeAgentCounterNames.MTU_EXCEEDED, PppoeAgentCounterNames.GENERIC_ERROR_FROM_SERVER,
+            PppoeAgentCounterNames.GENERIC_ERROR_FROM_CLIENT, PppoeAgentCounterNames.SERVICE_NAME_ERROR,
+            PppoeAgentCounterNames.AC_SYSTEM_ERROR);
+}
\ No newline at end of file
diff --git a/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentCountersIdentifier.java b/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentCountersIdentifier.java
new file mode 100644
index 0000000..cca163f
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentCountersIdentifier.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.pppoeagent.impl;
+
+import org.opencord.pppoeagent.PppoeAgentEvent;
+
+import java.util.Objects;
+
+/**
+ * Represents PPPoED agent counters identifier.
+ */
+public final class PppoeAgentCountersIdentifier {
+    final String counterClassKey;
+    final Enum<PppoeAgentCounterNames> counterTypeKey;
+
+    /**
+     * Creates a default global counter identifier for a given counterType.
+     *
+     * @param counterTypeKey Identifies the supported type of pppoe agent counters
+     */
+    public PppoeAgentCountersIdentifier(PppoeAgentCounterNames counterTypeKey) {
+        this.counterClassKey = PppoeAgentEvent.GLOBAL_COUNTER;
+        this.counterTypeKey = counterTypeKey;
+    }
+
+    /**
+     * Creates a counter identifier. A counter is defined by the key pair [counterClass, counterType],
+     * where counterClass can be global or the subscriber ID and counterType is the supported pppoe counter.
+     *
+     * @param counterClassKey Identifies which class the counter is assigned (global or per subscriber)
+     * @param counterTypeKey Identifies the supported type of pppoed relay counters
+     */
+    public PppoeAgentCountersIdentifier(String counterClassKey, PppoeAgentCounterNames counterTypeKey) {
+        this.counterClassKey = counterClassKey;
+        this.counterTypeKey = counterTypeKey;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj instanceof PppoeAgentCountersIdentifier) {
+            final PppoeAgentCountersIdentifier other = (PppoeAgentCountersIdentifier) obj;
+            return Objects.equals(this.counterClassKey, other.counterClassKey)
+                    && Objects.equals(this.counterTypeKey, other.counterTypeKey);
+        }
+
+        return false;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(counterClassKey, counterTypeKey);
+    }
+
+    @Override
+    public String toString() {
+        return "PppoeAgentCountersIdentifier{" +
+                "counterClassKey='" + counterClassKey + '\'' +
+                ", counterTypeKey=" + counterTypeKey +
+                '}';
+    }
+}
\ No newline at end of file
diff --git a/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentCountersStore.java b/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentCountersStore.java
new file mode 100644
index 0000000..5b4ddfd
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentCountersStore.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.pppoeagent.impl;
+
+import org.onosproject.store.Store;
+import org.opencord.pppoeagent.PppoeAgentEvent;
+import org.opencord.pppoeagent.PppoeAgentStoreDelegate;
+
+/**
+ * Represents a stored Pppoe Agent Counters. A counter entry is defined by the pair [counterClass, counterType],
+ * where counterClass can be maybe global or subscriber ID and counterType is the pppoe counter.
+ */
+public interface PppoeAgentCountersStore extends Store<PppoeAgentEvent, PppoeAgentStoreDelegate> {
+
+    String NAME = "PPPOE_Agent_stats";
+
+    /**
+     * Creates or updates PPPOE Agent counter.
+     *
+     * @param counterClass class of counters (global, per subscriber).
+     * @param counterType name of counter
+     */
+    void incrementCounter(String counterClass, PppoeAgentCounterNames counterType);
+
+    /**
+     * Sets the value of a PPPOE Agent counter.
+     *
+     * @param counterClass class of counters (global, per subscriber).
+     * @param counterType name of counter
+     * @param value The value of the counter
+     */
+    void setCounter(String counterClass, PppoeAgentCounterNames counterType, Long value);
+
+    /**
+     * Gets the current PPPoE Agent counter values.
+     *
+     * @return PPPoE Agent counter values
+     */
+    PppoeAgentStatistics getCounters();
+
+    /**
+     * Resets counter values for a given counter class.
+     *
+     * @param counterClass class of counters (global, per subscriber).
+     */
+    void resetCounters(String counterClass);
+}
\ No newline at end of file
diff --git a/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentStatistics.java b/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentStatistics.java
new file mode 100644
index 0000000..dce1ac7
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/impl/PppoeAgentStatistics.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.pppoeagent.impl;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Snapshot of PPPoE Agent statistics.
+ */
+public class PppoeAgentStatistics {
+    private final ImmutableMap<PppoeAgentCountersIdentifier, Long> counters;
+    private PppoeAgentStatistics(ImmutableMap<PppoeAgentCountersIdentifier, Long> counters) {
+        this.counters = counters;
+    }
+    /**
+     * Creates a new empty statistics instance.
+     */
+    public PppoeAgentStatistics() {
+        counters = ImmutableMap.of();
+    }
+    /**
+     * Gets the value of the counter with the given ID. Defaults to 0 if counter is not present.
+     *
+     * @param id counter ID
+     * @return counter value
+     */
+    public long get(PppoeAgentCountersIdentifier id) {
+        return counters.getOrDefault(id, 0L);
+    }
+    /**
+     * Gets the map of counters.
+     *
+     * @return map of counters
+     */
+    public Map<PppoeAgentCountersIdentifier, Long> counters() {
+        return counters;
+    }
+    /**
+     * Creates a new statistics instance with the given counter values.
+     *
+     * @param counters counters
+     * @return statistics
+     */
+    public static PppoeAgentStatistics withCounters(Map<PppoeAgentCountersIdentifier, Long> counters) {
+        ImmutableMap.Builder<PppoeAgentCountersIdentifier, Long> builder = ImmutableMap.builder();
+        counters.forEach(builder::put);
+        return new PppoeAgentStatistics(builder.build());
+    }
+    /**
+     * Adds the given statistics instance to this one (sums the common counters) and returns
+     * a new instance containing the result.
+     *
+     * @param other other instance
+     * @return result
+     */
+    public PppoeAgentStatistics add(PppoeAgentStatistics other) {
+        ImmutableMap.Builder<PppoeAgentCountersIdentifier, Long> builder = ImmutableMap.builder();
+        Set<PppoeAgentCountersIdentifier> keys = Sets.newHashSet(other.counters.keySet());
+        counters.forEach((id, value) -> {
+            builder.put(id, value + other.counters.getOrDefault(id, 0L));
+            keys.remove(id);
+        });
+        keys.forEach(i -> builder.put(i, other.counters.get(i)));
+        return new PppoeAgentStatistics(builder.build());
+    }
+    @Override
+    public String toString() {
+        MoreObjects.ToStringHelper helper = MoreObjects.toStringHelper(this.getClass());
+        counters.forEach((id, v) -> helper.add(id.toString(), v));
+        return helper.toString();
+    }
+}
\ No newline at end of file
diff --git a/app/src/main/java/org/opencord/pppoeagent/impl/SimplePppoeAgentCountersStore.java b/app/src/main/java/org/opencord/pppoeagent/impl/SimplePppoeAgentCountersStore.java
new file mode 100644
index 0000000..8be88b7
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/impl/SimplePppoeAgentCountersStore.java
@@ -0,0 +1,305 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.pppoeagent.impl;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableMap;
+
+import org.onlab.util.KryoNamespace;
+import org.onlab.util.SafeRecurringTask;
+import org.onlab.util.Tools;
+import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.store.AbstractStore;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterMessage;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.WallClockTimestamp;
+import org.opencord.pppoeagent.PppoeAgentEvent;
+import org.opencord.pppoeagent.PppoeAgentStoreDelegate;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Modified;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+
+import org.slf4j.Logger;
+import java.nio.charset.StandardCharsets;
+
+import java.util.Dictionary;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.opencord.pppoeagent.impl.OsgiPropertyConstants.PUBLISH_COUNTERS_RATE;
+import static org.opencord.pppoeagent.impl.OsgiPropertyConstants.PUBLISH_COUNTERS_RATE_DEFAULT;
+import static org.opencord.pppoeagent.impl.OsgiPropertyConstants.SYNC_COUNTERS_RATE;
+import static org.opencord.pppoeagent.impl.OsgiPropertyConstants.SYNC_COUNTERS_RATE_DEFAULT;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * PPPoE Agent Counters Manager Component.
+ */
+@Component(immediate = true,
+        property = {
+                PUBLISH_COUNTERS_RATE + ":Integer=" + PUBLISH_COUNTERS_RATE_DEFAULT,
+                SYNC_COUNTERS_RATE + ":Integer=" + SYNC_COUNTERS_RATE_DEFAULT,
+        }
+)
+public class SimplePppoeAgentCountersStore extends AbstractStore<PppoeAgentEvent, PppoeAgentStoreDelegate>
+        implements PppoeAgentCountersStore {
+    private static final String PPPOE_STATISTICS_LEADERSHIP = "pppoeagent-statistics";
+    private static final MessageSubject RESET_SUBJECT = new MessageSubject("pppoeagent-statistics-reset");
+
+    private final Logger log = getLogger(getClass());
+    private ConcurrentMap<PppoeAgentCountersIdentifier, Long> countersMap;
+
+    private EventuallyConsistentMap<NodeId, PppoeAgentStatistics> statistics;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected StorageService storageService;
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected ClusterService clusterService;
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected LeadershipService leadershipService;
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected ComponentConfigService componentConfigService;
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected ClusterCommunicationService clusterCommunicationService;
+    protected int publishCountersRate = PUBLISH_COUNTERS_RATE_DEFAULT;
+    protected int syncCountersRate = SYNC_COUNTERS_RATE_DEFAULT;
+    KryoNamespace serializer = KryoNamespace.newBuilder()
+            .register(KryoNamespaces.API)
+            .register(PppoeAgentStatistics.class)
+            .register(PppoeAgentCountersIdentifier.class)
+            .register(PppoeAgentCounterNames.class)
+            .register(ClusterMessage.class)
+            .register(MessageSubject.class)
+            .build();
+    private ScheduledExecutorService executor;
+    private ScheduledFuture<?> publisherTask;
+    private ScheduledFuture<?> syncTask;
+    private AtomicBoolean dirty = new AtomicBoolean(true);
+
+    @Activate
+    public void activate(ComponentContext context) {
+        log.info("Activate PPPoE Agent Counters Manager");
+        countersMap = new ConcurrentHashMap<>();
+        componentConfigService.registerProperties(getClass());
+        modified(context);
+        statistics = storageService.<NodeId, PppoeAgentStatistics>eventuallyConsistentMapBuilder()
+                .withName("pppoeagent-statistics")
+                .withSerializer(serializer)
+                .withTimestampProvider((k, v) -> new WallClockTimestamp())
+                .build();
+        // Initialize counter values for the global counters
+        initCounters(PppoeAgentEvent.GLOBAL_COUNTER, statistics.get(clusterService.getLocalNode().id()));
+        syncStats();
+        leadershipService.runForLeadership(PPPOE_STATISTICS_LEADERSHIP);
+        executor = Executors.newScheduledThreadPool(1);
+        clusterCommunicationService.addSubscriber(RESET_SUBJECT, Serializer.using(serializer)::decode,
+                this::resetLocal, executor);
+        startSyncTask();
+        startPublishTask();
+    }
+
+    @Deactivate
+    public void deactivate() {
+        clusterCommunicationService.removeSubscriber(RESET_SUBJECT);
+        leadershipService.withdraw(PPPOE_STATISTICS_LEADERSHIP);
+        stopPublishTask();
+        stopSyncTask();
+        executor.shutdownNow();
+        componentConfigService.unregisterProperties(getClass(), false);
+    }
+    @Modified
+    public void modified(ComponentContext context) {
+        Dictionary<String, Object> properties = context.getProperties();
+        String s = Tools.get(properties, PUBLISH_COUNTERS_RATE);
+        int oldPublishCountersRate = publishCountersRate;
+        publishCountersRate = Strings.isNullOrEmpty(s) ? PUBLISH_COUNTERS_RATE_DEFAULT
+                : Integer.parseInt(s.trim());
+        if (oldPublishCountersRate != publishCountersRate) {
+            stopPublishTask();
+            startPublishTask();
+        }
+        s = Tools.get(properties, SYNC_COUNTERS_RATE);
+        int oldSyncCountersRate = syncCountersRate;
+        syncCountersRate = Strings.isNullOrEmpty(s) ? SYNC_COUNTERS_RATE_DEFAULT
+                : Integer.parseInt(s.trim());
+        if (oldSyncCountersRate != syncCountersRate) {
+            stopSyncTask();
+            startSyncTask();
+        }
+    }
+    private ScheduledFuture<?> startTask(Runnable r, int rate) {
+        return executor.scheduleAtFixedRate(SafeRecurringTask.wrap(r),
+                0, rate, TimeUnit.SECONDS);
+    }
+    private void stopTask(ScheduledFuture<?> task) {
+        task.cancel(true);
+    }
+    private void startSyncTask() {
+        syncTask = startTask(this::syncStats, syncCountersRate);
+    }
+    private void stopSyncTask() {
+        stopTask(syncTask);
+    }
+    private void startPublishTask() {
+        publisherTask = startTask(this::publishStats, publishCountersRate);
+    }
+    private void stopPublishTask() {
+        stopTask(publisherTask);
+    }
+
+    ImmutableMap<PppoeAgentCountersIdentifier, Long> getCountersMap() {
+        return ImmutableMap.copyOf(countersMap);
+    }
+
+    public PppoeAgentStatistics getCounters() {
+        return aggregate();
+    }
+
+    /**
+     * Initialize the supported counters map for the given counter class.
+     * @param counterClass class of counters (global, per subscriber)
+     * @param existingStats existing values to intialise the counters to
+     */
+    public void initCounters(String counterClass, PppoeAgentStatistics existingStats) {
+        checkNotNull(counterClass, "counter class can't be null");
+        for (PppoeAgentCounterNames counterType : PppoeAgentCounterNames.SUPPORTED_COUNTERS) {
+            PppoeAgentCountersIdentifier id = new PppoeAgentCountersIdentifier(counterClass, counterType);
+            countersMap.put(id, existingStats == null ? 0L : existingStats.get(id));
+        }
+    }
+
+    /**
+     * Inserts the counter entry if it is not already in the set otherwise increment the existing counter entry.
+     * @param counterClass class of counters (global, per subscriber)
+     * @param counterType name of counter
+     */
+    public void incrementCounter(String counterClass, PppoeAgentCounterNames counterType) {
+        checkNotNull(counterClass, "counter class can't be null");
+        if (PppoeAgentCounterNames.SUPPORTED_COUNTERS.contains(counterType)) {
+            PppoeAgentCountersIdentifier counterIdentifier =
+                    new PppoeAgentCountersIdentifier(counterClass, counterType);
+            countersMap.compute(counterIdentifier, (key, counterValue) ->
+                    (counterValue != null) ? counterValue + 1 : 1L);
+        } else {
+            log.error("Failed to increment counter class {} of type {}", counterClass, counterType);
+        }
+        dirty.set(true);
+    }
+
+    /**
+     * Reset the counters map for the given counter class.
+     * @param counterClass class of counters (global, per subscriber)
+     */
+    public void resetCounters(String counterClass) {
+        byte[] payload = counterClass.getBytes(StandardCharsets.UTF_8);
+        ClusterMessage reset = new ClusterMessage(clusterService.getLocalNode().id(), RESET_SUBJECT, payload);
+        clusterCommunicationService.broadcastIncludeSelf(reset, RESET_SUBJECT, Serializer.using(serializer)::encode);
+    }
+    private void resetLocal(ClusterMessage m) {
+        String counterClass = new String(m.payload(), StandardCharsets.UTF_8);
+        checkNotNull(counterClass, "counter class can't be null");
+        for (PppoeAgentCounterNames counterType : PppoeAgentCounterNames.SUPPORTED_COUNTERS) {
+            PppoeAgentCountersIdentifier counterIdentifier =
+                    new PppoeAgentCountersIdentifier(counterClass, counterType);
+            countersMap.computeIfPresent(counterIdentifier, (key, counterValue) -> 0L);
+        }
+        dirty.set(true);
+        syncStats();
+    }
+
+    /**
+     * Inserts the counter entry if it is not already in the set otherwise update the existing counter entry.
+     * @param counterClass class of counters (global, per subscriber).
+     * @param counterType name of counter
+     * @param value counter value
+     */
+    public void setCounter(String counterClass, PppoeAgentCounterNames counterType, Long value) {
+        checkNotNull(counterClass, "counter class can't be null");
+        if (PppoeAgentCounterNames.SUPPORTED_COUNTERS.contains(counterType)) {
+            PppoeAgentCountersIdentifier counterIdentifier =
+                    new PppoeAgentCountersIdentifier(counterClass, counterType);
+            countersMap.put(counterIdentifier, value);
+        } else {
+            log.error("Failed to increment counter class {} of type {}", counterClass, counterType);
+        }
+        dirty.set(true);
+        syncStats();
+    }
+
+    private PppoeAgentStatistics aggregate() {
+        return statistics.values().stream()
+                .reduce(new PppoeAgentStatistics(), PppoeAgentStatistics::add);
+    }
+    /**
+     * Creates a snapshot of the current in-memory statistics.
+     *
+     * @return snapshot of statistics
+     */
+    private PppoeAgentStatistics snapshot() {
+        return PppoeAgentStatistics.withCounters(countersMap);
+    }
+    /**
+     * Syncs in-memory stats to the eventually consistent map.
+     */
+    private void syncStats() {
+        if (dirty.get()) {
+            statistics.put(clusterService.getLocalNode().id(), snapshot());
+            dirty.set(false);
+        }
+    }
+    private void publishStats() {
+        // Only publish events if we are the cluster leader for PPPoE Agent stats
+        if (!Objects.equals(leadershipService.getLeader(PPPOE_STATISTICS_LEADERSHIP),
+                clusterService.getLocalNode().id())) {
+            return;
+        }
+        if (aggregate().counters() != null) {
+            aggregate().counters().forEach((counterKey, counterValue) -> {
+                // Subscriber-specific counters have the subscriber ID set
+                String subscriberId = null;
+                if (!counterKey.counterClassKey.equals(PppoeAgentEvent.GLOBAL_COUNTER)) {
+                    subscriberId = counterKey.counterClassKey;
+                }
+                if (delegate != null) {
+                    delegate.notify(new PppoeAgentEvent(PppoeAgentEvent.Type.STATS_UPDATE, null,
+                                                        counterKey.counterTypeKey.toString(), counterValue,
+                                       null, subscriberId));
+                }
+            });
+        } else {
+            log.debug("Ignoring 'publishStats' request since counters are null.");
+        }
+    }
+}
\ No newline at end of file
diff --git a/app/src/main/java/org/opencord/pppoeagent/impl/package-info.java b/app/src/main/java/org/opencord/pppoeagent/impl/package-info.java
new file mode 100644
index 0000000..84ceb21
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/impl/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * PPPoE agent implementation.
+ */
+package org.opencord.pppoeagent.impl;
diff --git a/app/src/main/java/org/opencord/pppoeagent/rest/PppoeAgentWebResource.java b/app/src/main/java/org/opencord/pppoeagent/rest/PppoeAgentWebResource.java
new file mode 100644
index 0000000..56d5598
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/rest/PppoeAgentWebResource.java
@@ -0,0 +1,186 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.pppoeagent.rest;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import java.util.Map;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import org.onlab.packet.MacAddress;
+import org.onlab.packet.PPPoED;
+import org.onlab.util.ItemNotFoundException;
+import org.onosproject.cli.AbstractShellCommand;
+import org.onosproject.net.AnnotationKeys;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.Port;
+import org.onosproject.rest.AbstractWebResource;
+
+import org.opencord.pppoeagent.impl.PppoeAgentCounterNames;
+import org.opencord.pppoeagent.impl.PppoeAgentCountersStore;
+import org.opencord.pppoeagent.impl.PppoeAgentCountersIdentifier;
+import org.opencord.pppoeagent.impl.PppoeAgentStatistics;
+import org.opencord.pppoeagent.PppoeAgentEvent;
+import org.opencord.pppoeagent.PppoeAgentService;
+import org.opencord.pppoeagent.PppoeSessionInfo;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR;
+
+/**
+ * PppoeAgent web resource.
+ */
+@Path("pppoeagent-app")
+public class PppoeAgentWebResource extends AbstractWebResource {
+    private final ObjectNode root = mapper().createObjectNode();
+    private final ArrayNode node = root.putArray("entry");
+    private static final String SESSION_NOT_FOUND = "Session not found";
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    DeviceService deviceService = AbstractShellCommand.get(DeviceService.class);
+
+    /**
+     * Get session info object.
+     *
+     * @param mac Session MAC address
+     *
+     * @return 200 OK
+     */
+    @GET
+    @Path("/session/{mac}")
+    @Produces(MediaType.APPLICATION_JSON)
+    public Response getSubscriber(@PathParam("mac") String mac) {
+        MacAddress macAddress = MacAddress.valueOf(mac);
+        PppoeAgentService pppoeAgent = get(PppoeAgentService.class);
+        PppoeSessionInfo entry = pppoeAgent.getSessionsMap().get(macAddress);
+        if (entry == null) {
+            throw new ItemNotFoundException(SESSION_NOT_FOUND);
+        }
+
+        try {
+            node.add(encodePppoeSessionInfo(entry, macAddress));
+            return ok(mapper().writeValueAsString(root)).build();
+        } catch (IllegalArgumentException e) {
+            log.error("Error while fetching PPPoE session info for MAC {} through REST API: {}", mac, e.getMessage());
+            return Response.status(INTERNAL_SERVER_ERROR).build();
+        } catch (JsonProcessingException e) {
+            log.error("Error assembling JSON response for PPPoE session info request for MAC {} " +
+                    "through REST API: {}", mac, e.getMessage());
+            return Response.status(INTERNAL_SERVER_ERROR).build();
+        }
+    }
+
+    /**
+     * Get all session info objects.
+     *
+     * @return 200 OK
+     */
+    @GET
+    @Path("/session")
+    @Produces(MediaType.APPLICATION_JSON)
+    public Response getSubscribers() {
+        try {
+            PppoeAgentService pppoeAgent = get(PppoeAgentService.class);
+            pppoeAgent.getSessionsMap().forEach((mac, entry) -> {
+                node.add(encodePppoeSessionInfo(entry, mac));
+            });
+
+            return ok(mapper().writeValueAsString(root)).build();
+        } catch (Exception e) {
+            log.error("Error while fetching PPPoE sessions information through REST API: {}", e.getMessage());
+            return Response.status(INTERNAL_SERVER_ERROR).build();
+        }
+    }
+
+    private ObjectNode encodePppoeSessionInfo(PppoeSessionInfo entry, MacAddress macAddress) {
+        ConnectPoint cp = entry.getClientCp();
+        Port devicePort = deviceService.getPort(cp);
+        String portLabel = "uni-" + ((cp.port().toLong() & 0xF) + 1);
+        String subscriberId = devicePort != null ? devicePort.annotations().value(AnnotationKeys.PORT_NAME) :
+                "UNKNOWN";
+
+        return mapper().createObjectNode()
+                .put("macAddress", macAddress.toString())
+                .put("sessionId", entry.getSessionId())
+                .put("currentState", entry.getCurrentState())
+                .put("lastReceivedPacket", PPPoED.Type.getTypeByValue(entry.getPacketCode()).name())
+                .put("deviceId", cp.deviceId().toString())
+                .put("portNumber", cp.port().toString())
+                .put("portLabel", portLabel)
+                .put("subscriberId", subscriberId);
+    }
+
+    /**
+     * Gets PPPoE Agent counters for global context.
+     *
+     * @return 200 OK
+     */
+    @GET
+    @Path("/stats")
+    @Produces(MediaType.APPLICATION_JSON)
+    public Response getPppoeStats() {
+        return getStats(PppoeAgentEvent.GLOBAL_COUNTER);
+    }
+
+    /**
+     * Gets PPPoE Agent counters for specific subscriber.
+     *
+     * @param subscriberId Id of subscriber.
+     *
+     * @return 200 OK
+     */
+    @GET
+    @Path("/stats/{subscriberId}")
+    @Produces(MediaType.APPLICATION_JSON)
+    public Response getPppoeSubscriberStats(@PathParam("subscriberId") String subscriberId) {
+        return getStats(subscriberId);
+    }
+    private Response getStats(String key) {
+        PppoeAgentCountersStore pppoeCounters = get(PppoeAgentCountersStore.class);
+        try {
+            PppoeAgentStatistics pppoeStatistics = pppoeCounters.getCounters();
+            JsonNode completeNode = buildPppoeCounterNodeObject(key, pppoeStatistics.counters());
+            return ok(mapper().writeValueAsString(completeNode)).build();
+        } catch (JsonProcessingException e) {
+            log.error("Error while fetching PPPoE agent counter stats through REST API: {}", e.getMessage());
+            return Response.status(INTERNAL_SERVER_ERROR).build();
+        }
+    }
+
+    private JsonNode buildPppoeCounterNodeObject(String key, Map<PppoeAgentCountersIdentifier, Long> countersMap) {
+        ObjectNode entryNode = mapper().createObjectNode();
+        for (PppoeAgentCounterNames counterType : PppoeAgentCounterNames.SUPPORTED_COUNTERS) {
+            Long value = countersMap.get(new PppoeAgentCountersIdentifier(key, counterType));
+            if (value == null) {
+                continue;
+            }
+            entryNode = entryNode.put(counterType.name(), String.valueOf(value));
+        }
+        return mapper().createObjectNode().set(key, entryNode);
+    }
+}
\ No newline at end of file
diff --git a/app/src/main/java/org/opencord/pppoeagent/rest/package-info.java b/app/src/main/java/org/opencord/pppoeagent/rest/package-info.java
new file mode 100644
index 0000000..70a18d3
--- /dev/null
+++ b/app/src/main/java/org/opencord/pppoeagent/rest/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Rest interface for PppoeAgent.
+ */
+package org.opencord.pppoeagent.rest;
diff --git a/app/src/main/webapp/WEB-INF/web.xml b/app/src/main/webapp/WEB-INF/web.xml
new file mode 100644
index 0000000..9bb27a0
--- /dev/null
+++ b/app/src/main/webapp/WEB-INF/web.xml
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Copyright 2021-present Open Networking Foundation
+  ~
+  ~ Licensed under the Apache License, Version 2.0 (the "License");
+  ~ you may not use this file except in compliance with the License.
+  ~ You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://java.sun.com/xml/ns/javaee"
+         xmlns:web="http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"
+         xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"
+         id="ONOS" version="2.5">
+    <display-name>PPPoE Agent REST API v1.0</display-name>
+
+    <servlet>
+        <servlet-name>JAX-RS Service</servlet-name>
+        <servlet-class>org.glassfish.jersey.servlet.ServletContainer</servlet-class>
+        <init-param>
+            <param-name>jersey.config.server.provider.classnames</param-name>
+            <param-value>
+                org.opencord.pppoeagent.rest.PppoeAgentWebResource
+            </param-value>
+        </init-param>
+        <load-on-startup>1</load-on-startup>
+    </servlet>
+
+    <servlet-mapping>
+        <servlet-name>JAX-RS Service</servlet-name>
+        <url-pattern>/*</url-pattern>
+    </servlet-mapping>
+</web-app>